You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2019/12/31 12:57:07 UTC

[arrow] branch master updated: ARROW-7193: [Rust] Arrow stream reader

This is an automated email from the ASF dual-hosted git repository.

paddyhoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ba1800c  ARROW-7193: [Rust] Arrow stream reader
ba1800c is described below

commit ba1800c57265c0896c6f376d99f733abb5d67eaa
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Dec 31 07:56:47 2019 -0500

    ARROW-7193: [Rust] Arrow stream reader
    
    * Moved `arrow/ipc/file/reader` to `arrow/ipc/reader`
    * Renamed the file reader from `Reader` to `FileReader`
    * Added a `StreamReader`
    
    The stream reader currently terminates on a 0-byte read, but not on an EOS marker.
    We might need to work on the latter later, as I could not see the marker when inspecting the test files with a hex-editor.
    
    Closes #6013 from nevi-me/ARROW-7193 and squashes the following commits:
    
    3161c2540 <Neville Dipale> better error handling
    d278c90ab <Neville Dipale> update comments
    22e1955dd <Neville Dipale> ARROW-7193:  Arrow stream reader
    
    Authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Paddy Horan <pa...@hotmail.com>
---
 rust/arrow/src/ipc/file/mod.rs          |  18 ---
 rust/arrow/src/ipc/mod.rs               |   2 +-
 rust/arrow/src/ipc/{file => }/reader.rs | 197 +++++++++++++++++++++++++++++---
 3 files changed, 182 insertions(+), 35 deletions(-)

diff --git a/rust/arrow/src/ipc/file/mod.rs b/rust/arrow/src/ipc/file/mod.rs
deleted file mode 100644
index 4953590..0000000
--- a/rust/arrow/src/ipc/file/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod reader;
diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs
index 41f8150..8c3bc08 100644
--- a/rust/arrow/src/ipc/mod.rs
+++ b/rust/arrow/src/ipc/mod.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 pub mod convert;
-pub mod file;
+pub mod reader;
 
 pub mod gen;
 
diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/reader.rs
similarity index 74%
rename from rust/arrow/src/ipc/file/reader.rs
rename to rust/arrow/src/ipc/reader.rs
index e192964..511a982 100644
--- a/rust/arrow/src/ipc/file/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Arrow File Reader
+//! Arrow IPC File and Stream Readers
+//!
+//! The `FileReader` and `StreamReader` have similar interfaces,
+//! however the `FileReader` expects a reader that supports `Seek`ing
 
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
@@ -236,6 +239,7 @@ fn create_primitive_array(
                         .null_bit_buffer(buffers[0].clone())
                 }
                 let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
+                // this cast is infallible, the unwrap is safe
                 let casted = cast(&values, data_type).unwrap();
                 casted.data()
             } else {
@@ -264,6 +268,7 @@ fn create_primitive_array(
                         .null_bit_buffer(buffers[0].clone())
                 }
                 let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
+                // this cast is infallible, the unwrap is safe
                 let casted = cast(&values, data_type).unwrap();
                 casted.data()
             } else {
@@ -350,8 +355,12 @@ fn read_record_batch(
     batch: ipc::RecordBatch,
     schema: Arc<Schema>,
 ) -> Result<Option<RecordBatch>> {
-    let buffers = batch.buffers().unwrap();
-    let field_nodes = batch.nodes().unwrap();
+    let buffers = batch.buffers().ok_or(ArrowError::IoError(
+        "Unable to get buffers from IPC RecordBatch".to_string(),
+    ))?;
+    let field_nodes = batch.nodes().ok_or(ArrowError::IoError(
+        "Unable to get field nodes from IPC RecordBatch".to_string(),
+    ))?;
     // keep track of buffer and node index, the functions that create arrays mutate these
     let mut buffer_index = 0;
     let mut node_index = 0;
@@ -376,8 +385,8 @@ fn read_record_batch(
 }
 
 /// Arrow File reader
-pub struct Reader<R: Read + Seek> {
-    /// Buffered reader that supports reading and seeking
+pub struct FileReader<R: Read + Seek> {
+    /// Buffered file reader that supports reading and seeking
     reader: BufReader<R>,
     /// The schema that is read from the file header
     schema: Arc<Schema>,
@@ -391,8 +400,8 @@ pub struct Reader<R: Read + Seek> {
     total_blocks: usize,
 }
 
-impl<R: Read + Seek> Reader<R> {
-    /// Try to create a new reader
+impl<R: Read + Seek> FileReader<R> {
+    /// Try to create a new file reader
     ///
     /// Returns errors if the file does not meet the Arrow Format header and footer
     /// requirements
@@ -426,7 +435,10 @@ impl<R: Read + Seek> Reader<R> {
         let vecs = &meta_buffer.to_vec();
         let message = ipc::get_root_as_message(vecs);
         // message header is a Schema, so read it
-        let ipc_schema: ipc::Schema = message.header_as_schema().unwrap();
+        let ipc_schema: ipc::Schema =
+            message.header_as_schema().ok_or(ArrowError::IoError(
+                "Unable to Unable to read IPC message as schema".to_string(),
+            ))?;
         let schema = ipc::convert::fb_to_schema(ipc_schema);
 
         // what does the footer contain?
@@ -441,7 +453,9 @@ impl<R: Read + Seek> Reader<R> {
         reader.read_exact(&mut footer_data)?;
         let footer = ipc::get_root_as_footer(&footer_data[..]);
 
-        let blocks = footer.recordBatches().unwrap();
+        let blocks = footer.recordBatches().ok_or(ArrowError::IoError(
+            "Unable to get record batches from IPC Footer".to_string(),
+        ))?;
 
         let total_blocks = blocks.len();
 
@@ -483,10 +497,15 @@ impl<R: Read + Seek> Reader<R> {
 
             match message.header_type() {
                 ipc::MessageHeader::Schema => {
-                    panic!("Not expecting a schema when messages are read")
+                    return Err(ArrowError::IoError(
+                        "Not expecting a schema when messages are read".to_string(),
+                    ));
                 }
                 ipc::MessageHeader::RecordBatch => {
-                    let batch = message.header_as_record_batch().unwrap();
+                    let batch =
+                        message.header_as_record_batch().ok_or(ArrowError::IoError(
+                            "Unable to read IPC message as record batch".to_string(),
+                        ))?;
                     // read the block that makes up the record batch into a buffer
                     let mut buf = vec![0; block.bodyLength() as usize];
                     self.reader.seek(SeekFrom::Start(
@@ -496,9 +515,12 @@ impl<R: Read + Seek> Reader<R> {
 
                     read_record_batch(&buf, batch, self.schema())
                 }
-                _ => unimplemented!(
-                    "reading types other than record batches not yet supported"
-                ),
+                _ => {
+                    return Err(ArrowError::IoError(
+                        "Reading types other than record batches not yet supported"
+                            .to_string(),
+                    ));
+                }
             }
         } else {
             Ok(None)
@@ -521,7 +543,119 @@ impl<R: Read + Seek> Reader<R> {
     }
 }
 
-impl<R: Read + Seek> RecordBatchReader for Reader<R> {
+impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
+    fn schema(&mut self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
+        self.next()
+    }
+}
+
+/// Arrow Stream reader
+pub struct StreamReader<R: Read> {
+    /// Buffered stream reader
+    reader: BufReader<R>,
+    /// The schema that is read from the stream's first message
+    schema: Arc<Schema>,
+    /// An indicator of whether the strewam is complete.
+    ///
+    /// This value is set to `true` the first time the reader's `next()` returns `None`.
+    finished: bool,
+}
+
+impl<R: Read> StreamReader<R> {
+    /// Try to create a new stream reader
+    ///
+    /// The first message in the stream is the schema, the reader will fail if it does not
+    /// encounter a schema.
+    /// To check if the reader is done, use `is_finished(self)`
+    pub fn try_new(reader: R) -> Result<Self> {
+        let mut reader = BufReader::new(reader);
+        // determine metadata length
+        let mut meta_size: [u8; 4] = [0; 4];
+        reader.read_exact(&mut meta_size)?;
+        let meta_len = u32::from_le_bytes(meta_size);
+
+        let mut meta_buffer = vec![0; meta_len as usize];
+        reader.read_exact(&mut meta_buffer)?;
+
+        let vecs = &meta_buffer.to_vec();
+        let message = ipc::get_root_as_message(vecs);
+        // message header is a Schema, so read it
+        let ipc_schema: ipc::Schema = message.header_as_schema().ok_or(
+            ArrowError::IoError("Unable to read IPC message as schema".to_string()),
+        )?;
+        let schema = ipc::convert::fb_to_schema(ipc_schema);
+
+        Ok(Self {
+            reader,
+            schema: Arc::new(schema),
+            finished: false,
+        })
+    }
+
+    /// Return the schema of the stream
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Read the next record batch
+    pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+        if self.finished {
+            return Ok(None);
+        }
+        // determine metadata length
+        let mut meta_size: [u8; 4] = [0; 4];
+        self.reader.read_exact(&mut meta_size)?;
+        let meta_len = u32::from_le_bytes(meta_size);
+
+        if meta_len == 0 {
+            // the stream has ended, mark the reader as finished
+            self.finished = true;
+            return Ok(None);
+        }
+
+        let mut meta_buffer = vec![0; meta_len as usize];
+        self.reader.read_exact(&mut meta_buffer)?;
+
+        let vecs = &meta_buffer.to_vec();
+        let message = ipc::get_root_as_message(vecs);
+
+        match message.header_type() {
+            ipc::MessageHeader::Schema => {
+                return Err(ArrowError::IoError(
+                    "Not expecting a schema when messages are read".to_string(),
+                ));
+            }
+            ipc::MessageHeader::RecordBatch => {
+                let batch =
+                    message.header_as_record_batch().ok_or(ArrowError::IoError(
+                        "Unable to read IPC message as record batch".to_string(),
+                    ))?;
+                // read the block that makes up the record batch into a buffer
+                let mut buf = vec![0; message.bodyLength() as usize];
+                self.reader.read_exact(&mut buf)?;
+
+                read_record_batch(&buf, batch, self.schema())
+            }
+            _ => {
+                return Err(ArrowError::IoError(
+                    "Reading types other than record batches not yet supported"
+                        .to_string(),
+                ));
+            }
+        }
+    }
+
+    /// Check if the stream is finished
+    pub fn is_finished(&self) -> bool {
+        self.finished
+    }
+}
+
+impl<R: Read> RecordBatchReader for StreamReader<R> {
     fn schema(&mut self) -> SchemaRef {
         self.schema.clone()
     }
@@ -560,11 +694,42 @@ mod tests {
             ))
             .unwrap();
 
-            let mut reader = Reader::try_new(file).unwrap();
+            let mut reader = FileReader::try_new(file).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(path);
+            assert!(arrow_json.equals_reader(&mut reader));
+        });
+    }
+
+    #[test]
+    fn read_generated_streams() {
+        let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec![
+            "generated_interval",
+            "generated_datetime",
+            "generated_nested",
+            "generated_primitive_no_batches",
+            "generated_primitive_zerolength",
+            "generated_primitive",
+        ];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc/integration/0.14.1/{}.stream",
+                testdata, path
+            ))
+            .unwrap();
+
+            let mut reader = StreamReader::try_new(file).unwrap();
 
             // read expected JSON output
             let arrow_json = read_gzip_json(path);
             assert!(arrow_json.equals_reader(&mut reader));
+            // the next batch must be empty
+            assert!(reader.next().unwrap().is_none());
+            // the stream must indicate that it's finished
+            assert!(reader.is_finished());
         });
     }