You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/05 13:14:55 UTC
[arrow-rs] branch master updated: Allow creating unbuffered streamreader (#4165)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a2bd26964 Allow creating unbuffered streamreader (#4165)
a2bd26964 is described below
commit a2bd2696435ef818f545bc09a55914bebaf493e6
Author: ming08108 <mi...@users.noreply.github.com>
AuthorDate: Fri May 5 08:14:49 2023 -0500
Allow creating unbuffered streamreader (#4165)
* allow creating unbuffered streamreader
* remove unneeded annotations
* address PR comments
---
arrow-ipc/src/reader.rs | 25 ++++++++++++++++++-------
1 file changed, 18 insertions(+), 7 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index d19869616..162e92914 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -1075,8 +1075,8 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
/// Arrow Stream reader
pub struct StreamReader<R: Read> {
- /// Buffered stream reader
- reader: BufReader<R>,
+ /// Stream reader
+ reader: R,
/// The schema that is read from the stream's first message
schema: SchemaRef,
@@ -1107,8 +1107,8 @@ impl<R: Read> fmt::Debug for StreamReader<R> {
}
}
-impl<R: Read> StreamReader<R> {
- /// Try to create a new stream reader
+impl<R: Read> StreamReader<BufReader<R>> {
+ /// Try to create a new stream reader with the reader wrapped in a BufReader
///
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
@@ -1117,7 +1117,18 @@ impl<R: Read> StreamReader<R> {
reader: R,
projection: Option<Vec<usize>>,
) -> Result<Self, ArrowError> {
- let mut reader = BufReader::new(reader);
+ Self::try_new_unbuffered(BufReader::new(reader), projection)
+ }
+}
+
+impl<R: Read> StreamReader<R> {
+ /// Try to create a new stream reader but do not wrap the reader in a BufReader.
+ ///
+ /// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead.
+ pub fn try_new_unbuffered(
+ mut reader: R,
+ projection: Option<Vec<usize>>,
+ ) -> Result<StreamReader<R>, ArrowError> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
@@ -1262,14 +1273,14 @@ impl<R: Read> StreamReader<R> {
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &R {
- self.reader.get_ref()
+ &self.reader
}
/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
- self.reader.get_mut()
+ &mut self.reader
}
}