You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/05 13:14:55 UTC

[arrow-rs] branch master updated: Allow creating unbuffered streamreader (#4165)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a2bd26964 Allow creating unbuffered streamreader (#4165)
a2bd26964 is described below

commit a2bd2696435ef818f545bc09a55914bebaf493e6
Author: ming08108 <mi...@users.noreply.github.com>
AuthorDate: Fri May 5 08:14:49 2023 -0500

    Allow creating unbuffered streamreader (#4165)
    
    * allow creating unbuffered streamreader
    
    * remove unneeded annotations
    
    * address PR comments
---
 arrow-ipc/src/reader.rs | 25 ++++++++++++++++++-------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index d19869616..162e92914 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -1075,8 +1075,8 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
 
 /// Arrow Stream reader
 pub struct StreamReader<R: Read> {
-    /// Buffered stream reader
-    reader: BufReader<R>,
+    /// Stream reader
+    reader: R,
 
     /// The schema that is read from the stream's first message
     schema: SchemaRef,
@@ -1107,8 +1107,8 @@ impl<R: Read> fmt::Debug for StreamReader<R> {
     }
 }
 
-impl<R: Read> StreamReader<R> {
-    /// Try to create a new stream reader
+impl<R: Read> StreamReader<BufReader<R>> {
+    /// Try to create a new stream reader with the reader wrapped in a BufReader
     ///
     /// The first message in the stream is the schema, the reader will fail if it does not
     /// encounter a schema.
@@ -1117,7 +1117,18 @@ impl<R: Read> StreamReader<R> {
         reader: R,
         projection: Option<Vec<usize>>,
     ) -> Result<Self, ArrowError> {
-        let mut reader = BufReader::new(reader);
+        Self::try_new_unbuffered(BufReader::new(reader), projection)
+    }
+}
+
+impl<R: Read> StreamReader<R> {
+    /// Try to create a new stream reader but do not wrap the reader in a BufReader.
+    ///
+    /// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead.
+    pub fn try_new_unbuffered(
+        mut reader: R,
+        projection: Option<Vec<usize>>,
+    ) -> Result<StreamReader<R>, ArrowError> {
         // determine metadata length
         let mut meta_size: [u8; 4] = [0; 4];
         reader.read_exact(&mut meta_size)?;
@@ -1262,14 +1273,14 @@ impl<R: Read> StreamReader<R> {
     ///
     /// It is inadvisable to directly read from the underlying reader.
     pub fn get_ref(&self) -> &R {
-        self.reader.get_ref()
+        &self.reader
     }
 
     /// Gets a mutable reference to the underlying reader.
     ///
     /// It is inadvisable to directly read from the underlying reader.
     pub fn get_mut(&mut self) -> &mut R {
-        self.reader.get_mut()
+        &mut self.reader
     }
 }