You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/06 15:38:11 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request, #1803: Add AsyncChunkReader trait

tustvold opened a new pull request, #1803:
URL: https://github.com/apache/arrow-rs/pull/1803

   # Which issue does this PR close?
   
   Part of #1605
   
   # Rationale for this change
    
   I originally wanted to introduce an API that would afford clients greater control of this, but particularly with the in-flight work to support column indexes (#1749) and (#1191), it is unclear what exactly this interface should look like. Rather than potentially paint us into a corner, lets keep the interface high-level and we can introduce a lower-level API as and when desired.
   
   # What changes are included in this PR?
   
   This extracts an `AsyncChunkReader` which exposes an API to fetch byte ranges. For backwards compatibility, an implementation is provided for `AsyncRead + AsyncSeek`. This provides a fairly straightforward location for DataFusion to plug in an object store that supports byte range fetches.
   
   # Are there any user-facing changes?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890464300


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> AsyncChunkReader for T {
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move {
+            self.seek(SeekFrom::Start(range.start as u64)).await?;

Review Comment:
   The method takes a mutable reference, so they can't race



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890464300


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> AsyncChunkReader for T {
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move {
+            self.seek(SeekFrom::Start(range.start as u64)).await?;

Review Comment:
   The method takes a mutable reference, and return a future with the same lifetime, so they can't race



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #1803: Add AsyncFileReader trait

Posted by GitBox <gi...@apache.org>.
tustvold merged PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1803: Add AsyncFileReader trait

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890967716


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   I went with `AsyncFileReader` and added a load of doc comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] codecov-commenter commented on pull request #1803: Add AsyncFileReader trait

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#issuecomment-1148425789

   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1803](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c6d8230) into [master](https://codecov.io/gh/apache/arrow-rs/commit/c8d4323826b81e5a338b03e307ad7d29b20de2bf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c8d4323) will **decrease** coverage by `0.00%`.
   > The diff coverage is `80.06%`.
   
   > :exclamation: Current head c6d8230 differs from pull request most recent head 98515a0. Consider uploading reports for the commit 98515a0 to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1803      +/-   ##
   ==========================================
   - Coverage   83.42%   83.42%   -0.01%     
   ==========================================
     Files         198      199       +1     
     Lines       56327    56632     +305     
   ==========================================
   + Hits        46990    47243     +253     
   - Misses       9337     9389      +52     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/async\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXN5bmNfcmVhZGVyLnJz) | `0.00% <0.00%> (ø)` | |
   | [parquet/src/file/metadata.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9tZXRhZGF0YS5ycw==) | `95.12% <ø> (ø)` | |
   | [parquet/src/file/serialized\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9zZXJpYWxpemVkX3JlYWRlci5ycw==) | `94.46% <ø> (ø)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `87.17% <0.00%> (-2.30%)` | :arrow_down: |
   | [parquet/src/file/page\_index/range.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9wYWdlX2luZGV4L3JhbmdlLnJz) | `90.26% <90.26%> (ø)` | |
   | [arrow/src/ipc/writer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy93cml0ZXIucnM=) | `81.78% <95.12%> (+0.85%)` | :arrow_up: |
   | [arrow/src/ipc/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy9yZWFkZXIucnM=) | `90.82% <100.00%> (+0.08%)` | :arrow_up: |
   | [parquet/src/encodings/encoding.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZW5jb2RpbmdzL2VuY29kaW5nLnJz) | `93.65% <0.00%> (+0.19%)` | :arrow_up: |
   | [arrow/src/ipc/gen/Message.rs](https://codecov.io/gh/apache/arrow-rs/pull/1803/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy9nZW4vTWVzc2FnZS5ycw==) | `35.39% <0.00%> (+0.74%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [c8d4323...98515a0](https://codecov.io/gh/apache/arrow-rs/pull/1803?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890453518


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> AsyncChunkReader for T {
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move {
+            self.seek(SeekFrom::Start(range.start as u64)).await?;

Review Comment:
   Hmm, as `get_bytes` and `get_metadata` both async, and they both call `seek`. Is any chance there will be race condition between then? For example, calling `get_bytes` first but the file pos is changed by call `get_metadata` next?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890428403


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   `ParquetReaderAccess`?
   
   The usecase of cached / catalog'd metadata is a good one -- perhaps we can just add a comment explaining the rationale. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890535439


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   Yes I agree it is now more confusing -- hoping a good night sleep will make it clearer. I am happy with whatever you decide



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890414738


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   `AsyncParquetFile`?
   
   I agree it is a little odd, but I want to give flexibility to how this metadata is sourced to allow for caching, pre-fetching, etc...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890410756


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -477,3 +508,82 @@ impl PageIterator for ColumnChunkIterator {
         Ok(self.column_schema.clone())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use arrow::error::Result as ArrowResult;
+    use futures::TryStreamExt;
+    use std::sync::Mutex;
+
+    struct TestReader {
+        data: Bytes,
+        metadata: Arc<ParquetMetaData>,
+        requests: Arc<Mutex<Vec<Range<usize>>>>,
+    }
+
+    impl AsyncChunkReader for TestReader {
+        fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+            self.requests.lock().unwrap().push(range.clone());
+            futures::future::ready(Ok(self.data.slice(range))).boxed()
+        }
+
+        fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
+            futures::future::ready(Ok(self.metadata.clone())).boxed()
+        }
+    }
+
+    #[tokio::test]
+    async fn test_async_reader() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_plain.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = crate::file::footer::parse_metadata(&data).unwrap();
+        let metadata = Arc::new(metadata);
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let requests = async_reader.requests.clone();
+        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+            .await
+            .unwrap();
+
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
+        let stream = builder
+            .with_projection(mask.clone())
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+
+        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
+
+        let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap();
+        let sync_batches = sync_reader
+            .get_record_reader_by_columns(mask, 1024)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(async_batches, sync_batches);
+
+        let requests = requests.lock().unwrap();
+        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
+        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
+
+        assert_eq!(

Review Comment:
   👍 



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   I find it a little strange that the `get_metadata` is part of `AsyncChunkReader` as I would have expected the "read bytes" and "logically read and decode parquet data" more separated
   
   Would it make sense to  consider two separate traits? Something like the following perhaps 🤔 
   
   ```rust
   /// A reader that can asynchronously read a range of bytes
   pub trait AsyncChunkReader: Send + Unpin + 'static {
       /// Retrieve the bytes in `range`
       fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
   }
   
   /// Returns parquet metadata, possibly asynchronously 
   pub trait AsyncParquetReader: Send + Unpin + 'static {
    /// Retrieve the [`ParquetMetaData`] for this file
       fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
   }
   ```
   
   Or maybe call it `AsyncChunkedParquetReader`? (though I admit that is getting to be a mouthful)



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -309,25 +369,24 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
                             let mut column_chunks =
                                 vec![None; row_group_metadata.columns().len()];
 
+                            // TODO: Combine consecutive ranges
                             for (idx, chunk) in column_chunks.iter_mut().enumerate() {
                                 if !projection.leaf_included(idx) {
                                     continue;
                                 }
 
                                 let column = row_group_metadata.column(idx);
                                 let (start, length) = column.byte_range();
-                                let end = start + length;
-
-                                input.seek(SeekFrom::Start(start)).await?;
 
-                                let mut buffer = vec![0_u8; (end - start) as usize];
-                                input.read_exact(buffer.as_mut_slice()).await?;
+                                let data = input

Review Comment:
   this code is just factored into `AsyncChunkReader::get_bytes` correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1803: Add AsyncChunkReader trait

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1803:
URL: https://github.com/apache/arrow-rs/pull/1803#discussion_r890496351


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -101,8 +103,63 @@ use crate::file::footer::parse_metadata_buffer;
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
 use crate::file::PARQUET_MAGIC;
-use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
-use crate::util::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
+
+/// A reader that can asynchronously read a range of bytes
+pub trait AsyncChunkReader: Send + Unpin + 'static {
+    /// Retrieve the bytes in `range`
+    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+
+    /// Retrieve the [`ParquetMetaData`] for this file
+    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

Review Comment:
   I've split the traits up, but it feels very odd to me to have two traits implemented on the same type that is then passed to `ParquetRecordBatchStreamBuilder::new`...
   
   I'll revisit in the morning, I feel this has just made it more confusing tbh...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org