You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2019/12/31 12:57:07 UTC
[arrow] branch master updated: ARROW-7193: [Rust] Arrow stream
reader
This is an automated email from the ASF dual-hosted git repository.
paddyhoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ba1800c ARROW-7193: [Rust] Arrow stream reader
ba1800c is described below
commit ba1800c57265c0896c6f376d99f733abb5d67eaa
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Dec 31 07:56:47 2019 -0500
ARROW-7193: [Rust] Arrow stream reader
* Moved `arrow/ipc/file/reader` to `arrow/ipc/reader`
* Renamed the file reader from `Reader` to `FileReader`
* Added a `StreamReader`
The stream reader currently terminates on a 0-byte read, but not on an EOS marker.
We might need to work on the latter later, as I could not see the marker when inspecting the test files with a hex-editor.
Closes #6013 from nevi-me/ARROW-7193 and squashes the following commits:
3161c2540 <Neville Dipale> better error handling
d278c90ab <Neville Dipale> update comments
22e1955dd <Neville Dipale> ARROW-7193: Arrow stream reader
Authored-by: Neville Dipale <ne...@gmail.com>
Signed-off-by: Paddy Horan <pa...@hotmail.com>
---
rust/arrow/src/ipc/file/mod.rs | 18 ---
rust/arrow/src/ipc/mod.rs | 2 +-
rust/arrow/src/ipc/{file => }/reader.rs | 197 +++++++++++++++++++++++++++++---
3 files changed, 182 insertions(+), 35 deletions(-)
diff --git a/rust/arrow/src/ipc/file/mod.rs b/rust/arrow/src/ipc/file/mod.rs
deleted file mode 100644
index 4953590..0000000
--- a/rust/arrow/src/ipc/file/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod reader;
diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs
index 41f8150..8c3bc08 100644
--- a/rust/arrow/src/ipc/mod.rs
+++ b/rust/arrow/src/ipc/mod.rs
@@ -16,7 +16,7 @@
// under the License.
pub mod convert;
-pub mod file;
+pub mod reader;
pub mod gen;
diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/reader.rs
similarity index 74%
rename from rust/arrow/src/ipc/file/reader.rs
rename to rust/arrow/src/ipc/reader.rs
index e192964..511a982 100644
--- a/rust/arrow/src/ipc/file/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! Arrow File Reader
+//! Arrow IPC File and Stream Readers
+//!
+//! The `FileReader` and `StreamReader` have similar interfaces,
+//! however the `FileReader` expects a reader that supports `Seek`ing
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
@@ -236,6 +239,7 @@ fn create_primitive_array(
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
+ // this cast is infallible, the unwrap is safe
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
@@ -264,6 +268,7 @@ fn create_primitive_array(
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
+ // this cast is infallible, the unwrap is safe
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
@@ -350,8 +355,12 @@ fn read_record_batch(
batch: ipc::RecordBatch,
schema: Arc<Schema>,
) -> Result<Option<RecordBatch>> {
- let buffers = batch.buffers().unwrap();
- let field_nodes = batch.nodes().unwrap();
+ let buffers = batch.buffers().ok_or(ArrowError::IoError(
+ "Unable to get buffers from IPC RecordBatch".to_string(),
+ ))?;
+ let field_nodes = batch.nodes().ok_or(ArrowError::IoError(
+ "Unable to get field nodes from IPC RecordBatch".to_string(),
+ ))?;
// keep track of buffer and node index, the functions that create arrays mutate these
let mut buffer_index = 0;
let mut node_index = 0;
@@ -376,8 +385,8 @@ fn read_record_batch(
}
/// Arrow File reader
-pub struct Reader<R: Read + Seek> {
- /// Buffered reader that supports reading and seeking
+pub struct FileReader<R: Read + Seek> {
+ /// Buffered file reader that supports reading and seeking
reader: BufReader<R>,
/// The schema that is read from the file header
schema: Arc<Schema>,
@@ -391,8 +400,8 @@ pub struct Reader<R: Read + Seek> {
total_blocks: usize,
}
-impl<R: Read + Seek> Reader<R> {
- /// Try to create a new reader
+impl<R: Read + Seek> FileReader<R> {
+ /// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
@@ -426,7 +435,10 @@ impl<R: Read + Seek> Reader<R> {
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
// message header is a Schema, so read it
- let ipc_schema: ipc::Schema = message.header_as_schema().unwrap();
+ let ipc_schema: ipc::Schema =
+ message.header_as_schema().ok_or(ArrowError::IoError(
+ "Unable to Unable to read IPC message as schema".to_string(),
+ ))?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
// what does the footer contain?
@@ -441,7 +453,9 @@ impl<R: Read + Seek> Reader<R> {
reader.read_exact(&mut footer_data)?;
let footer = ipc::get_root_as_footer(&footer_data[..]);
- let blocks = footer.recordBatches().unwrap();
+ let blocks = footer.recordBatches().ok_or(ArrowError::IoError(
+ "Unable to get record batches from IPC Footer".to_string(),
+ ))?;
let total_blocks = blocks.len();
@@ -483,10 +497,15 @@ impl<R: Read + Seek> Reader<R> {
match message.header_type() {
ipc::MessageHeader::Schema => {
- panic!("Not expecting a schema when messages are read")
+ return Err(ArrowError::IoError(
+ "Not expecting a schema when messages are read".to_string(),
+ ));
}
ipc::MessageHeader::RecordBatch => {
- let batch = message.header_as_record_batch().unwrap();
+ let batch =
+ message.header_as_record_batch().ok_or(ArrowError::IoError(
+ "Unable to read IPC message as record batch".to_string(),
+ ))?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
self.reader.seek(SeekFrom::Start(
@@ -496,9 +515,12 @@ impl<R: Read + Seek> Reader<R> {
read_record_batch(&buf, batch, self.schema())
}
- _ => unimplemented!(
- "reading types other than record batches not yet supported"
- ),
+ _ => {
+ return Err(ArrowError::IoError(
+ "Reading types other than record batches not yet supported"
+ .to_string(),
+ ));
+ }
}
} else {
Ok(None)
@@ -521,7 +543,119 @@ impl<R: Read + Seek> Reader<R> {
}
}
-impl<R: Read + Seek> RecordBatchReader for Reader<R> {
+impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
+ fn schema(&mut self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
+ self.next()
+ }
+}
+
+/// Arrow Stream reader
+pub struct StreamReader<R: Read> {
+ /// Buffered stream reader
+ reader: BufReader<R>,
+ /// The schema that is read from the stream's first message
+ schema: Arc<Schema>,
+ /// An indicator of whether the strewam is complete.
+ ///
+ /// This value is set to `true` the first time the reader's `next()` returns `None`.
+ finished: bool,
+}
+
+impl<R: Read> StreamReader<R> {
+ /// Try to create a new stream reader
+ ///
+ /// The first message in the stream is the schema, the reader will fail if it does not
+ /// encounter a schema.
+ /// To check if the reader is done, use `is_finished(self)`
+ pub fn try_new(reader: R) -> Result<Self> {
+ let mut reader = BufReader::new(reader);
+ // determine metadata length
+ let mut meta_size: [u8; 4] = [0; 4];
+ reader.read_exact(&mut meta_size)?;
+ let meta_len = u32::from_le_bytes(meta_size);
+
+ let mut meta_buffer = vec![0; meta_len as usize];
+ reader.read_exact(&mut meta_buffer)?;
+
+ let vecs = &meta_buffer.to_vec();
+ let message = ipc::get_root_as_message(vecs);
+ // message header is a Schema, so read it
+ let ipc_schema: ipc::Schema = message.header_as_schema().ok_or(
+ ArrowError::IoError("Unable to read IPC message as schema".to_string()),
+ )?;
+ let schema = ipc::convert::fb_to_schema(ipc_schema);
+
+ Ok(Self {
+ reader,
+ schema: Arc::new(schema),
+ finished: false,
+ })
+ }
+
+ /// Return the schema of the stream
+ pub fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Read the next record batch
+ pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+ if self.finished {
+ return Ok(None);
+ }
+ // determine metadata length
+ let mut meta_size: [u8; 4] = [0; 4];
+ self.reader.read_exact(&mut meta_size)?;
+ let meta_len = u32::from_le_bytes(meta_size);
+
+ if meta_len == 0 {
+ // the stream has ended, mark the reader as finished
+ self.finished = true;
+ return Ok(None);
+ }
+
+ let mut meta_buffer = vec![0; meta_len as usize];
+ self.reader.read_exact(&mut meta_buffer)?;
+
+ let vecs = &meta_buffer.to_vec();
+ let message = ipc::get_root_as_message(vecs);
+
+ match message.header_type() {
+ ipc::MessageHeader::Schema => {
+ return Err(ArrowError::IoError(
+ "Not expecting a schema when messages are read".to_string(),
+ ));
+ }
+ ipc::MessageHeader::RecordBatch => {
+ let batch =
+ message.header_as_record_batch().ok_or(ArrowError::IoError(
+ "Unable to read IPC message as record batch".to_string(),
+ ))?;
+ // read the block that makes up the record batch into a buffer
+ let mut buf = vec![0; message.bodyLength() as usize];
+ self.reader.read_exact(&mut buf)?;
+
+ read_record_batch(&buf, batch, self.schema())
+ }
+ _ => {
+ return Err(ArrowError::IoError(
+ "Reading types other than record batches not yet supported"
+ .to_string(),
+ ));
+ }
+ }
+ }
+
+ /// Check if the stream is finished
+ pub fn is_finished(&self) -> bool {
+ self.finished
+ }
+}
+
+impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&mut self) -> SchemaRef {
self.schema.clone()
}
@@ -560,11 +694,42 @@ mod tests {
))
.unwrap();
- let mut reader = Reader::try_new(file).unwrap();
+ let mut reader = FileReader::try_new(file).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(path);
+ assert!(arrow_json.equals_reader(&mut reader));
+ });
+ }
+
+ #[test]
+ fn read_generated_streams() {
+ let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc/integration/0.14.1/{}.stream",
+ testdata, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
+ // the next batch must be empty
+ assert!(reader.next().unwrap().is_none());
+ // the stream must indicate that it's finished
+ assert!(reader.is_finished());
});
}