You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/06 18:15:56 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #1803: Add AsyncChunkReader trait

alamb commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890410756


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -477,3 +508,82 @@ impl PageIterator for ColumnChunkIterator {
         Ok(self.column_schema.clone())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use arrow::error::Result as ArrowResult;
+    use futures::TryStreamExt;
+    use std::sync::Mutex;
+
+    struct TestReader {
+        data: Bytes,
+        metadata: Arc<ParquetMetaData>,
+        requests: Arc<Mutex<Vec<Range<usize>>>>,
+    }
+
+    impl AsyncChunkReader for TestReader {
+        fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+            self.requests.lock().unwrap().push(range.clone());
+            futures::future::ready(Ok(self.data.slice(range))).boxed()
+        }
+
+        fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
+            futures::future::ready(Ok(self.metadata.clone())).boxed()
+        }
+    }
+
+    #[tokio::test]
+    async fn test_async_reader() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_plain.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = crate::file::footer::parse_metadata(&data).unwrap();
+        let metadata = Arc::new(metadata);
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let requests = async_reader.requests.clone();
+        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+            .await
+            .unwrap();
+
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
+        let stream = builder
+            .with_projection(mask.clone())
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+
+        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
+
+        let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap();
+        let sync_batches = sync_reader
+            .get_record_reader_by_columns(mask, 1024)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(async_batches, sync_batches);
+
+        let requests = requests.lock().unwrap();
+        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
+        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
+
+        assert_eq!(

Review Comment:
   👍 



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   I find it a little strange that the `get_metadata` is part of `AsyncChunkReader` as I would have expected the "read bytes" and "logically read and decode parquet data" more separated
   
   Would it make sense to  consider two separate traits? Something like the following perhaps 🤔 
   
   ```rust
   /// A reader that can asynchronously read a range of bytes
   pub trait AsyncChunkReader: Send + Unpin + 'static {
       /// Retrieve the bytes in `range`
       fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
   }
   
   /// Returns parquet metadata, possibly asynchronously 
   pub trait AsyncParquetReader: Send + Unpin + 'static {
    /// Retrieve the [`ParquetMetaData`] for this file
       fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
   }
   ```
   
   Or maybe call it `AsyncChunkedParquetReader`? (though I admit that is getting to be a mouthful)



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -309,25 +369,24 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
                             let mut column_chunks =
                                 vec![None; row_group_metadata.columns().len()];
 
+                            // TODO: Combine consecutive ranges
                             for (idx, chunk) in column_chunks.iter_mut().enumerate() {
                                 if !projection.leaf_included(idx) {
                                     continue;
                                 }
 
                                 let column = row_group_metadata.column(idx);
                                 let (start, length) = column.byte_range();
-                                let end = start + length;
-
-                                input.seek(SeekFrom::Start(start)).await?;
 
-                                let mut buffer = vec![0_u8; (end - start) as usize];
-                                input.read_exact(buffer.as_mut_slice()).await?;
+                                let data = input

Review Comment:
   this code is just factored into `AsyncChunkReader::get_bytes` correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org