You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2019/04/12 17:02:27 UTC
[arrow] branch master updated: ARROW-5127: [Rust] [Parquet] Add
page iterator.
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 28db947 ARROW-5127: [Rust] [Parquet] Add page iterator.
28db947 is described below
commit 28db9474f45f66f6101d57621a1c3f47e62fe0fc
Author: Renjie Liu <li...@gmail.com>
AuthorDate: Fri Apr 12 10:02:04 2019 -0700
ARROW-5127: [Rust] [Parquet] Add page iterator.
Add page iterator.
Author: Renjie Liu <li...@gmail.com>
Closes #4136 from liurenjie1024/arrow-5127 and squashes the following commits:
8a260062 <Renjie Liu> Fix some comment
54207e49 <Renjie Liu> Add index out of bound error
919d3566 <Renjie Liu> Fix some comments
143446d2 <Renjie Liu> Add page iterator.
---
rust/parquet/src/column/page.rs | 10 ++++
rust/parquet/src/errors.rs | 4 ++
rust/parquet/src/file/reader.rs | 105 +++++++++++++++++++++++++++++++++++++++-
3 files changed, 118 insertions(+), 1 deletion(-)
diff --git a/rust/parquet/src/column/page.rs b/rust/parquet/src/column/page.rs
index 9e0c76f..af86b02 100644
--- a/rust/parquet/src/column/page.rs
+++ b/rust/parquet/src/column/page.rs
@@ -20,6 +20,7 @@
use crate::basic::{Encoding, PageType};
use crate::errors::Result;
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
/// Parquet Page definition.
@@ -217,6 +218,15 @@ pub trait PageWriter {
fn close(&mut self) -> Result<()>;
}
+/// An iterator over pages of some specific column in a parquet file.
+pub trait PageIterator: Iterator<Item = Result<Box<PageReader>>> {
+ /// Get schema of parquet file.
+ fn schema(&mut self) -> Result<SchemaDescPtr>;
+
+ /// Get column schema of this page iterator.
+ fn column_schema(&mut self) -> Result<ColumnDescPtr>;
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs
index 1ab51a1..fe19c1f 100644
--- a/rust/parquet/src/errors.rs
+++ b/rust/parquet/src/errors.rs
@@ -58,6 +58,10 @@ quick_error! {
description(message)
from(e: ArrowError) -> (format!("underlying Arrow error: {:?}", e))
}
+ IndexOutOfBound(index: usize, bound: usize) {
+ display("Index {} out of bound: {}", index, bound)
+ description("Index out of bound error")
+ }
}
}
diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs
index 90d1155..aa9d03d 100644
--- a/rust/parquet/src/file/reader.rs
+++ b/rust/parquet/src/file/reader.rs
@@ -33,6 +33,7 @@ use parquet_format::{
use thrift::protocol::TCompactInputProtocol;
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
+use crate::column::page::PageIterator;
use crate::column::{
page::{Page, PageReader},
reader::{ColumnReader, ColumnReaderImpl},
@@ -41,7 +42,9 @@ use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC};
use crate::record::reader::RowIter;
-use crate::schema::types::{self, SchemaDescriptor, Type as SchemaType};
+use crate::schema::types::{
+ self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType,
+};
use crate::util::{io::FileSource, memory::ByteBufferPtr};
// ----------------------------------------------------------------------
@@ -560,6 +563,75 @@ impl<T: Read> PageReader for SerializedPageReader<T> {
}
}
+/// Implementation of page iterator for parquet file.
+pub struct FilePageIterator {
+ column_index: usize,
+ row_group_indices: Box<Iterator<Item = usize>>,
+ file_reader: Rc<FileReader>,
+}
+
+impl FilePageIterator {
+ /// Creates a page iterator for all row groups in file.
+ pub fn new(column_index: usize, file_reader: Rc<FileReader>) -> Result<Self> {
+ let num_row_groups = file_reader.metadata().num_row_groups();
+
+ let row_group_indices = Box::new(0..num_row_groups);
+
+ Self::with_row_groups(column_index, row_group_indices, file_reader)
+ }
+
+ /// Create page iterator from parquet file reader with only some row groups.
+ pub fn with_row_groups(
+ column_index: usize,
+ row_group_indices: Box<Iterator<Item = usize>>,
+ file_reader: Rc<FileReader>,
+ ) -> Result<Self> {
+ // Check that column_index is valid
+ let num_columns = file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr()
+ .num_columns();
+
+ if column_index >= num_columns {
+ return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
+ }
+
+ // We don't check iterators here because iterator may be infinite
+ Ok(Self {
+ column_index,
+ row_group_indices,
+ file_reader,
+ })
+ }
+}
+
+impl Iterator for FilePageIterator {
+ type Item = Result<Box<PageReader>>;
+
+ fn next(&mut self) -> Option<Result<Box<PageReader>>> {
+ self.row_group_indices.next().map(|row_group_index| {
+ self.file_reader
+ .get_row_group(row_group_index)
+ .and_then(|r| r.get_column_page_reader(self.column_index))
+ })
+ }
+}
+
+impl PageIterator for FilePageIterator {
+ fn schema(&mut self) -> Result<SchemaDescPtr> {
+ Ok(self
+ .file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr())
+ }
+
+ fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+ self.schema().map(|s| s.column(self.column_index))
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -925,4 +997,35 @@ mod tests {
}
assert_eq!(page_count, 2);
}
+
+ #[test]
+ fn test_page_iterator() {
+ let file = get_test_file("alltypes_plain.parquet");
+ let file_reader = Rc::new(SerializedFileReader::new(file).unwrap());
+
+ let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
+
+ // read first page
+ let page = page_iterator.next();
+ assert!(page.is_some());
+ assert!(page.unwrap().is_ok());
+
+ // reach end of file
+ let page = page_iterator.next();
+ assert!(page.is_none());
+
+ let row_group_indices = Box::new(0..1);
+ let mut page_iterator =
+ FilePageIterator::with_row_groups(0, row_group_indices, file_reader.clone())
+ .unwrap();
+
+ // read first page
+ let page = page_iterator.next();
+ assert!(page.is_some());
+ assert!(page.unwrap().is_ok());
+
+ // reach end of file
+ let page = page_iterator.next();
+ assert!(page.is_none());
+ }
}