You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2019/04/12 17:02:27 UTC

[arrow] branch master updated: ARROW-5127: [Rust] [Parquet] Add page iterator.

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 28db947  ARROW-5127: [Rust] [Parquet] Add page iterator.
28db947 is described below

commit 28db9474f45f66f6101d57621a1c3f47e62fe0fc
Author: Renjie Liu <li...@gmail.com>
AuthorDate: Fri Apr 12 10:02:04 2019 -0700

    ARROW-5127: [Rust] [Parquet] Add page iterator.
    
    Add page iterator.
    
    Author: Renjie Liu <li...@gmail.com>
    
    Closes #4136 from liurenjie1024/arrow-5127 and squashes the following commits:
    
    8a260062 <Renjie Liu> Fix some comment
    54207e49 <Renjie Liu> Add index out of bound error
    919d3566 <Renjie Liu> Fix some comments
    143446d2 <Renjie Liu> Add page iterator.
---
 rust/parquet/src/column/page.rs |  10 ++++
 rust/parquet/src/errors.rs      |   4 ++
 rust/parquet/src/file/reader.rs | 105 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 118 insertions(+), 1 deletion(-)

diff --git a/rust/parquet/src/column/page.rs b/rust/parquet/src/column/page.rs
index 9e0c76f..af86b02 100644
--- a/rust/parquet/src/column/page.rs
+++ b/rust/parquet/src/column/page.rs
@@ -20,6 +20,7 @@
 use crate::basic::{Encoding, PageType};
 use crate::errors::Result;
 use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 use crate::util::memory::ByteBufferPtr;
 
 /// Parquet Page definition.
@@ -217,6 +218,15 @@ pub trait PageWriter {
     fn close(&mut self) -> Result<()>;
 }
 
+/// An iterator over pages of some specific column in a parquet file.
+pub trait PageIterator: Iterator<Item = Result<Box<PageReader>>> {
+    /// Get schema of parquet file.
+    fn schema(&mut self) -> Result<SchemaDescPtr>;
+
+    /// Get column schema of this page iterator.
+    fn column_schema(&mut self) -> Result<ColumnDescPtr>;
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs
index 1ab51a1..fe19c1f 100644
--- a/rust/parquet/src/errors.rs
+++ b/rust/parquet/src/errors.rs
@@ -58,6 +58,10 @@ quick_error! {
               description(message)
               from(e: ArrowError) -> (format!("underlying Arrow error: {:?}", e))
       }
+      IndexOutOfBound(index: usize, bound: usize) {
+          display("Index {} out of bound: {}", index, bound)
+              description("Index out of bound error")
+      }
   }
 }
 
diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs
index 90d1155..aa9d03d 100644
--- a/rust/parquet/src/file/reader.rs
+++ b/rust/parquet/src/file/reader.rs
@@ -33,6 +33,7 @@ use parquet_format::{
 use thrift::protocol::TCompactInputProtocol;
 
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
+use crate::column::page::PageIterator;
 use crate::column::{
     page::{Page, PageReader},
     reader::{ColumnReader, ColumnReaderImpl},
@@ -41,7 +42,9 @@ use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC};
 use crate::record::reader::RowIter;
-use crate::schema::types::{self, SchemaDescriptor, Type as SchemaType};
+use crate::schema::types::{
+    self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType,
+};
 use crate::util::{io::FileSource, memory::ByteBufferPtr};
 
 // ----------------------------------------------------------------------
@@ -560,6 +563,75 @@ impl<T: Read> PageReader for SerializedPageReader<T> {
     }
 }
 
+/// Implementation of page iterator for parquet file.
+pub struct FilePageIterator {
+    column_index: usize,
+    row_group_indices: Box<Iterator<Item = usize>>,
+    file_reader: Rc<FileReader>,
+}
+
+impl FilePageIterator {
+    /// Creates a page iterator for all row groups in file.
+    pub fn new(column_index: usize, file_reader: Rc<FileReader>) -> Result<Self> {
+        let num_row_groups = file_reader.metadata().num_row_groups();
+
+        let row_group_indices = Box::new(0..num_row_groups);
+
+        Self::with_row_groups(column_index, row_group_indices, file_reader)
+    }
+
+    /// Create page iterator from parquet file reader with only some row groups.
+    pub fn with_row_groups(
+        column_index: usize,
+        row_group_indices: Box<Iterator<Item = usize>>,
+        file_reader: Rc<FileReader>,
+    ) -> Result<Self> {
+        // Check that column_index is valid
+        let num_columns = file_reader
+            .metadata()
+            .file_metadata()
+            .schema_descr_ptr()
+            .num_columns();
+
+        if column_index >= num_columns {
+            return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
+        }
+
+        // We don't check iterators here because iterator may be infinite
+        Ok(Self {
+            column_index,
+            row_group_indices,
+            file_reader,
+        })
+    }
+}
+
+impl Iterator for FilePageIterator {
+    type Item = Result<Box<PageReader>>;
+
+    fn next(&mut self) -> Option<Result<Box<PageReader>>> {
+        self.row_group_indices.next().map(|row_group_index| {
+            self.file_reader
+                .get_row_group(row_group_index)
+                .and_then(|r| r.get_column_page_reader(self.column_index))
+        })
+    }
+}
+
+impl PageIterator for FilePageIterator {
+    fn schema(&mut self) -> Result<SchemaDescPtr> {
+        Ok(self
+            .file_reader
+            .metadata()
+            .file_metadata()
+            .schema_descr_ptr())
+    }
+
+    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+        self.schema().map(|s| s.column(self.column_index))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -925,4 +997,35 @@ mod tests {
         }
         assert_eq!(page_count, 2);
     }
+
+    #[test]
+    fn test_page_iterator() {
+        let file = get_test_file("alltypes_plain.parquet");
+        let file_reader = Rc::new(SerializedFileReader::new(file).unwrap());
+
+        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
+
+        // read first page
+        let page = page_iterator.next();
+        assert!(page.is_some());
+        assert!(page.unwrap().is_ok());
+
+        // reach end of file
+        let page = page_iterator.next();
+        assert!(page.is_none());
+
+        let row_group_indices = Box::new(0..1);
+        let mut page_iterator =
+            FilePageIterator::with_row_groups(0, row_group_indices, file_reader.clone())
+                .unwrap();
+
+        // read first page
+        let page = page_iterator.next();
+        assert!(page.is_some());
+        assert!(page.unwrap().is_ok());
+
+        // reach end of file
+        let page = page_iterator.next();
+        assert!(page.is_none());
+    }
 }