You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2019/07/30 08:20:50 UTC

[arrow] branch master updated: ARROW-4365: [Rust] [Parquet] Implement arrow record reader.

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3112d34  ARROW-4365: [Rust] [Parquet] Implement arrow record reader.
3112d34 is described below

commit 3112d346962575af911a94d4cce81e9929accfe7
Author: Renjie Liu <li...@gmail.com>
AuthorDate: Tue Jul 30 01:20:04 2019 -0700

    ARROW-4365: [Rust] [Parquet] Implement arrow record reader.
    
    RecordReader reads logical records into memory, this is the prerequisite for ColumnReader.
    
    Closes #4292 from liurenjie1024/arrow-4365 and squashes the following commits:
    
    874bb23cb <Renjie Liu> Fix ci
    0c396c7ea <Renjie Liu> Fix comments
    138fabec7 <Renjie Liu> Fix one method
    e279a0009 <Renjie Liu> Fix comments
    19db497e2 <Renjie Liu> fix build break
    e6c497092 <Renjie Liu> Fix style
    8295a284c <Renjie Liu> Use backtick instead of triple
    e1da42619 <Renjie Liu> Fix comments
    5c8b6b380 <Renjie Liu> Fix format error
    1b288a5b4 <Renjie Liu> Implement record reader.
    
    Authored-by: Renjie Liu <li...@gmail.com>
    Signed-off-by: Chao Sun <su...@apache.org>
---
 rust/parquet/src/arrow/mod.rs           |   1 +
 rust/parquet/src/arrow/record_reader.rs | 716 ++++++++++++++++++++++++++++++++
 2 files changed, 717 insertions(+)

diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index fe580c5..a86ffe0 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -20,6 +20,7 @@
 //!
 //! This mod provides API for converting between arrow and parquet.
 
+pub(crate) mod record_reader;
 pub mod schema;
 
 pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs
new file mode 100644
index 0000000..6b1237c
--- /dev/null
+++ b/rust/parquet/src/arrow/record_reader.rs
@@ -0,0 +1,716 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::{max, min};
+use std::mem::replace;
+use std::mem::size_of;
+use std::mem::transmute;
+use std::slice;
+
+use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use crate::data_type::DataType;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use arrow::array::{BooleanBufferBuilder, BufferBuilderTrait};
+use arrow::bitmap::Bitmap;
+use arrow::buffer::{Buffer, MutableBuffer};
+
+const MIN_BATCH_SIZE: usize = 1024;
+
+/// A `RecordReader` is a stateful column reader that delimits semantic records.
+pub struct RecordReader<T: DataType> {
+    column_desc: ColumnDescPtr,
+
+    records: MutableBuffer,
+    def_levels: Option<MutableBuffer>,
+    rep_levels: Option<MutableBuffer>,
+    null_bitmap: Option<BooleanBufferBuilder>,
+    column_reader: Option<ColumnReaderImpl<T>>,
+
+    /// Number of records accumulated in records
+    num_records: usize,
+
+    values_seen: usize,
+    /// Starts from 1, number of values have been written to buffer
+    values_written: usize,
+    in_middle_of_record: bool,
+}
+
+#[derive(Debug)]
+struct FatPtr<T> {
+    ptr: *const T,
+    len: usize,
+}
+
+impl<T> FatPtr<T> {
+    fn new(ptr: *const T, len: usize) -> Self {
+        Self { ptr, len }
+    }
+
+    fn with_offset(buf: &MutableBuffer, offset: usize) -> Self {
+        FatPtr::<T>::with_offset_and_size(buf, offset, size_of::<T>())
+    }
+
+    fn with_offset_and_size(
+        buf: &MutableBuffer,
+        offset: usize,
+        type_size: usize,
+    ) -> Self {
+        unsafe {
+            FatPtr::new(
+                transmute::<*const u8, *mut T>(buf.raw_data()).add(offset),
+                buf.capacity() / type_size - offset,
+            )
+        }
+    }
+
+    fn to_slice(&self) -> &[T] {
+        unsafe { slice::from_raw_parts(self.ptr, self.len) }
+    }
+
+    fn to_slice_mut(&self) -> &mut [T] {
+        unsafe { slice::from_raw_parts_mut(self.ptr as *mut T, self.len) }
+    }
+}
+
+impl<T: DataType> RecordReader<T> {
+    pub fn new(column_schema: ColumnDescPtr) -> Self {
+        let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
+            (
+                Some(MutableBuffer::new(MIN_BATCH_SIZE)),
+                Some(BooleanBufferBuilder::new(MIN_BATCH_SIZE)),
+            )
+        } else {
+            (None, None)
+        };
+
+        let rep_levels = if column_schema.max_rep_level() > 0 {
+            Some(MutableBuffer::new(MIN_BATCH_SIZE))
+        } else {
+            None
+        };
+
+        Self {
+            records: MutableBuffer::new(MIN_BATCH_SIZE),
+            def_levels,
+            rep_levels,
+            null_bitmap: null_map,
+            column_reader: None,
+            column_desc: column_schema,
+            num_records: 0,
+            values_seen: 0,
+            values_written: 0,
+            in_middle_of_record: false,
+        }
+    }
+
+    /// Set the current page reader.
+    pub fn set_page_reader(&mut self, page_reader: Box<PageReader>) -> Result<()> {
+        self.column_reader =
+            Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader));
+        Ok(())
+    }
+
+    /// Try to read `num_records` of column data into internal buffer.
+    ///
+    /// # Returns
+    ///
+    /// Number of actual records read.
+    pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
+        assert!(self.column_reader.is_some());
+        let mut records_read = 0;
+
+        // Used to mark whether we have reached the end of current
+        // column chunk
+        let mut end_of_column = false;
+
+        loop {
+            // Try to find some records from buffers that has been read into memory
+            // but not counted as seen records.
+            records_read += self.split_records(num_records - records_read)?;
+
+            // Since page reader contains complete records, so if we reached end of a
+            // page reader, we should reach the end of a record
+            if end_of_column
+                && self.values_seen >= self.values_written
+                && self.in_middle_of_record
+            {
+                self.num_records += 1;
+                self.in_middle_of_record = false;
+                records_read += 1;
+            }
+
+            if (records_read >= num_records) || end_of_column {
+                break;
+            }
+
+            let batch_size = max(num_records - records_read, MIN_BATCH_SIZE);
+
+            // Try to more value from parquet pages
+            let values_read = self.read_one_batch(batch_size)?;
+            if values_read < batch_size {
+                end_of_column = true;
+            }
+        }
+
+        Ok(records_read)
+    }
+
+    /// Returns number of records stored in buffer.
+    pub fn num_records(&self) -> usize {
+        self.num_records
+    }
+
+    /// Returns definition level data.
+    pub fn consume_def_levels(&mut self) -> Option<Buffer> {
+        let empty_def_buffer = if self.column_desc.max_def_level() > 0 {
+            Some(MutableBuffer::new(MIN_BATCH_SIZE))
+        } else {
+            None
+        };
+
+        replace(&mut self.def_levels, empty_def_buffer).map(|x| x.freeze())
+    }
+
+    /// Return repetition level data
+    pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
+        let empty_def_buffer = if self.column_desc.max_rep_level() > 0 {
+            Some(MutableBuffer::new(MIN_BATCH_SIZE))
+        } else {
+            None
+        };
+
+        replace(&mut self.rep_levels, empty_def_buffer).map(|x| x.freeze())
+    }
+
+    /// Returns currently stored buffer data.
+    pub fn consume_record_data(&mut self) -> Buffer {
+        replace(&mut self.records, MutableBuffer::new(MIN_BATCH_SIZE)).freeze()
+    }
+
+    pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
+        let bitmap_builder = if self.column_desc.max_def_level() > 0 {
+            Some(BooleanBufferBuilder::new(MIN_BATCH_SIZE))
+        } else {
+            None
+        };
+
+        replace(&mut self.null_bitmap, bitmap_builder).map(|mut builder| builder.finish())
+    }
+
+    /// Returns bitmap data.
+    pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
+        self.consume_bitmap_buffer()
+            .map(|buffer| Bitmap::from(buffer))
+    }
+
+    /// Try to read one batch of data.
+    fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
+        // Reserve spaces
+        self.records
+            .reserve(self.records.len() + batch_size * T::get_type_size())?;
+        if let Some(ref mut buf) = self.rep_levels {
+            buf.reserve(buf.len() + batch_size * size_of::<i16>())
+                .map(|_| ())?;
+        }
+        if let Some(ref mut buf) = self.def_levels {
+            buf.reserve(buf.len() + batch_size * size_of::<i16>())
+                .map(|_| ())?;
+        }
+
+        // Convert mutable buffer spaces to mutable slices
+        let values_buf = FatPtr::<T::T>::with_offset_and_size(
+            &self.records,
+            self.values_written,
+            T::get_type_size(),
+        );
+
+        let mut def_levels_buf = self
+            .def_levels
+            .as_ref()
+            .map(|buf| FatPtr::<i16>::with_offset(buf, self.values_written));
+
+        let mut rep_levels_buf = self
+            .rep_levels
+            .as_ref()
+            .map(|buf| FatPtr::<i16>::with_offset(buf, self.values_written));
+
+        let (values_read, levels_read) =
+            self.column_reader.as_mut().unwrap().read_batch(
+                batch_size,
+                def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
+                rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
+                values_buf.to_slice_mut(),
+            )?;
+
+        let max_def_level = self.column_desc.max_def_level();
+
+        if values_read < levels_read {
+            // This means that there are null values in column data
+            // TODO: Move this into ColumnReader
+
+            let values_buf = values_buf.to_slice_mut();
+
+            let def_levels_buf = def_levels_buf
+                .as_mut()
+                .map(|ptr| ptr.to_slice_mut())
+                .ok_or_else(|| {
+                    general_err!(
+                        "Definition levels should exist when data is less than levels!"
+                    )
+                })?;
+
+            // Fill spaces in column data with default values
+            let mut values_pos = values_read;
+            let mut level_pos = levels_read;
+
+            while level_pos > values_pos {
+                if def_levels_buf[level_pos - 1] == max_def_level {
+                    // This values is not empty
+                    // We use swap rather than assign here because T::T doesn't
+                    // implement Copy
+                    values_buf.swap(level_pos - 1, values_pos - 1);
+                    values_pos -= 1;
+                } else {
+                    values_buf[level_pos - 1] = T::T::default();
+                }
+
+                level_pos -= 1;
+            }
+        }
+
+        // Fill in bitmap data
+        if let Some(null_buffer) = self.null_bitmap.as_mut() {
+            let def_levels_buf = def_levels_buf
+                .as_mut()
+                .map(|ptr| ptr.to_slice_mut())
+                .ok_or_else(|| {
+                    general_err!(
+                        "Definition levels should exist when data is less than levels!"
+                    )
+                })?;
+            (0..levels_read).try_for_each(|idx| {
+                null_buffer.append(def_levels_buf[idx] == max_def_level)
+            })?;
+        }
+
+        let values_read = max(values_read, levels_read);
+        self.set_values_written(self.values_written + values_read)?;
+        Ok(values_read)
+    }
+
+    /// Split values into records according repetition definition and returns number of
+    /// records read.
+    fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
+        let rep_levels_buf = self
+            .rep_levels
+            .as_ref()
+            .map(|buf| FatPtr::<i16>::with_offset(buf, 0));
+        let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice());
+
+        match rep_levels_buf {
+            Some(buf) => {
+                let mut records_read = 0;
+
+                while (self.values_seen < self.values_written)
+                    && (records_read < records_to_read)
+                {
+                    if buf[self.values_seen] == 0 {
+                        if self.in_middle_of_record {
+                            records_read += 1;
+                            self.num_records += 1;
+                        }
+                        self.in_middle_of_record = true;
+                    }
+                    self.values_seen += 1;
+                }
+
+                Ok(records_read)
+            }
+            None => {
+                let records_read =
+                    min(records_to_read, self.values_written - self.values_seen);
+                self.num_records += records_read;
+                self.values_seen += records_read;
+                self.in_middle_of_record = false;
+
+                Ok(records_read)
+            }
+        }
+    }
+
+    fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
+        self.values_written = new_values_written;
+        self.records
+            .resize(self.values_written * T::get_type_size())?;
+
+        let new_levels_len = self.values_written * size_of::<i16>();
+
+        if let Some(ref mut buf) = self.rep_levels {
+            buf.resize(new_levels_len)?
+        };
+
+        if let Some(ref mut buf) = self.def_levels {
+            buf.resize(new_levels_len)?
+        };
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::RecordReader;
+    use crate::basic::Encoding;
+    use crate::column::page::Page;
+    use crate::column::page::PageReader;
+    use crate::data_type::Int32Type;
+    use crate::errors::Result;
+    use crate::schema::parser::parse_message_type;
+    use crate::schema::types::SchemaDescriptor;
+    use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
+    use arrow::array::{
+        BooleanBufferBuilder, BufferBuilderTrait, Int16BufferBuilder, Int32BufferBuilder,
+    };
+    use arrow::bitmap::Bitmap;
+    use std::rc::Rc;
+
+    struct TestPageReader {
+        pages: Box<Iterator<Item = Page>>,
+    }
+
+    impl TestPageReader {
+        pub fn new(pages: Vec<Page>) -> Self {
+            Self {
+                pages: Box::new(pages.into_iter()),
+            }
+        }
+    }
+
+    impl PageReader for TestPageReader {
+        fn get_next_page(&mut self) -> Result<Option<Page>> {
+            Ok(self.pages.next())
+        }
+    }
+
+    #[test]
+    fn test_read_required_records() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Rc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        // First page
+
+        // Records data:
+        // test_schema
+        //   leaf: 4
+        // test_schema
+        //   leaf: 7
+        // test_schema
+        //   leaf: 6
+        // test_schema
+        //   left: 3
+        // test_schema
+        //   left: 2
+        {
+            let values = [4, 7, 6, 3, 2];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.read_records(2).unwrap());
+            assert_eq!(2, record_reader.num_records());
+            assert_eq!(3, record_reader.read_records(3).unwrap());
+            assert_eq!(5, record_reader.num_records());
+        }
+
+        // Second page
+
+        // Records data:
+        // test_schema
+        //   leaf: 8
+        // test_schema
+        //   leaf: 9
+        {
+            let values = [8, 9];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 2, true);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.read_records(10).unwrap());
+            assert_eq!(7, record_reader.num_records());
+        }
+
+        let mut bb = Int32BufferBuilder::new(7);
+        bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]).unwrap();
+        let expected_buffer = bb.finish();
+        assert_eq!(expected_buffer, record_reader.consume_record_data());
+        assert_eq!(None, record_reader.consume_def_levels());
+        assert_eq!(None, record_reader.consume_bitmap());
+    }
+
+    #[test]
+    fn test_read_optional_records() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          OPTIONAL Group test_struct {
+            OPTIONAL INT32 leaf;
+          }
+        }
+        ";
+
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Rc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        // First page
+
+        // Records data:
+        // test_schema
+        //   test_struct
+        // test_schema
+        //   test_struct
+        //     left: 7
+        // test_schema
+        // test_schema
+        //   test_struct
+        //     leaf: 6
+        // test_schema
+        //   test_struct
+        //     leaf: 6
+        {
+            let values = [7, 6, 3];
+            //empty, non-empty, empty, non-empty, non-empty
+            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.read_records(2).unwrap());
+            assert_eq!(2, record_reader.num_records());
+            assert_eq!(3, record_reader.read_records(3).unwrap());
+            assert_eq!(5, record_reader.num_records());
+        }
+
+        // Second page
+
+        // Records data:
+        // test_schema
+        // test_schema
+        //   test_struct
+        //     left: 8
+        {
+            let values = [8];
+            //empty, non-empty
+            let def_levels = [0i16, 2i16];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 2, true);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.read_records(10).unwrap());
+            assert_eq!(7, record_reader.num_records());
+        }
+
+        // Verify result record data
+        let mut bb = Int32BufferBuilder::new(7);
+        bb.append_slice(&[0, 7, 0, 6, 3, 0, 8]).unwrap();
+        let expected_buffer = bb.finish();
+        assert_eq!(expected_buffer, record_reader.consume_record_data());
+
+        // Verify result def levels
+        let mut bb = Int16BufferBuilder::new(7);
+        bb.append_slice(&[1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16])
+            .unwrap();
+        let expected_def_levels = bb.finish();
+        assert_eq!(
+            Some(expected_def_levels),
+            record_reader.consume_def_levels()
+        );
+
+        // Verify bitmap
+        let mut bb = BooleanBufferBuilder::new(7);
+        bb.append_slice(&[false, true, false, true, true, false, true])
+            .unwrap();
+        let expected_bitmap = Bitmap::from(bb.finish());
+        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+    }
+
+    #[test]
+    fn test_read_repeated_records() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REPEATED Group test_struct {
+            REPEATED  INT32 leaf;
+          }
+        }
+        ";
+
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Rc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        // First page
+
+        // Records data:
+        // test_schema
+        //   test_struct
+        //     leaf: 4
+        // test_schema
+        // test_schema
+        //   test_struct
+        //   test_struct
+        //     leaf: 7
+        //     leaf: 6
+        //     leaf: 3
+        //   test_struct
+        //     leaf: 2
+        {
+            let values = [4, 7, 6, 3, 2];
+            let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
+            let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
+            pb.add_rep_levels(2, &rep_levels);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+
+            assert_eq!(1, record_reader.read_records(1).unwrap());
+            assert_eq!(1, record_reader.num_records());
+            assert_eq!(2, record_reader.read_records(3).unwrap());
+            assert_eq!(3, record_reader.num_records());
+        }
+
+        // Second page
+
+        // Records data:
+        // test_schema
+        //   test_struct
+        //     leaf: 8
+        //     leaf: 9
+        {
+            let values = [8, 9];
+            let def_levels = [2i16, 2i16];
+            let rep_levels = [0i16, 2i16];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 2, true);
+            pb.add_rep_levels(2, &rep_levels);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+
+            assert_eq!(1, record_reader.read_records(10).unwrap());
+            assert_eq!(4, record_reader.num_records());
+        }
+
+        // Verify result record data
+        let mut bb = Int32BufferBuilder::new(9);
+        bb.append_slice(&[4, 0, 0, 7, 6, 3, 2, 8, 9]).unwrap();
+        let expected_buffer = bb.finish();
+        assert_eq!(expected_buffer, record_reader.consume_record_data());
+
+        // Verify result def levels
+        let mut bb = Int16BufferBuilder::new(9);
+        bb.append_slice(&[2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16])
+            .unwrap();
+        let expected_def_levels = bb.finish();
+        assert_eq!(
+            Some(expected_def_levels),
+            record_reader.consume_def_levels()
+        );
+
+        // Verify bitmap
+        let mut bb = BooleanBufferBuilder::new(9);
+        bb.append_slice(&[true, false, false, true, true, true, true, true, true])
+            .unwrap();
+        let expected_bitmap = Bitmap::from(bb.finish());
+        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+    }
+
+    #[test]
+    fn test_read_more_than_one_batch() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REPEATED  INT32 leaf;
+        }
+        ";
+
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Rc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        {
+            let values = [100; 5000];
+            let def_levels = [1i16; 5000];
+            let mut rep_levels = [1i16; 5000];
+            for idx in 0..1000 {
+                rep_levels[idx * 5] = 0i16;
+            }
+
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5000, true);
+            pb.add_rep_levels(1, &rep_levels);
+            pb.add_def_levels(1, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(TestPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+
+            assert_eq!(1000, record_reader.read_records(1000).unwrap());
+            assert_eq!(1000, record_reader.num_records());
+        }
+    }
+}