You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/11 10:21:17 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request #1154: POC: Async parquet reader

tustvold opened a new pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154


   **Proof of concept, tests are currently extremely limited**
   
   # Which issue does this PR close?
   
   Closes #111 .
   
   # Rationale for this change
   
   See ticket, in particular I wanted to confirm that it is possible to create an async parquet reader without any major changes to the parquet crate. This seems to come up as a frequent ask from the community, and I think we could support it without any major churn.
   
   # What changes are included in this PR?
   
   Adds a layer of indirection to `array_reader` to abstract it away from files, _I think this change may stand on its own merits_.
   
   It then adds a ParquetRecordBatchStream which is a `Stream` that yields `RecordBatch`. Under the hood, this uses async to read row groups into memory and then feeds these into the non-async decoders. 
   
   The [parquet docs](https://parquet.apache.org/documentation/latest/) describe the column chunk as the unit of IO, and so I think buffering compressed row groups in memory is not an impractical approach. It also avoids having to maintain sync and async version of all the decoders, readers, etc...
   
   # Are there any user-facing changes?
   
   The only changes are to `array_reader` which since #1133 no longer has stability guarantees
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1024915824


   Actually, I see https://github.com/apache/arrow-datafusion/pull/1617 demonstrates what impacts this has on DataFusion, which seems just fine πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see [here](https://github.com/apache/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L388), let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actual fetching of data from object storage
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1024901197


   Actually, don't we have to add `async` to the tests to ensure CI coverage?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see [here](https://github.com/apache/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L388), let alone writing or doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actual fetching of data from object storage, save for requesting the necessary byte ranges from the reader implementation, limited by any projections, filters, or row group selections pushed down by the query engine.
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see [here](https://github.com/apache/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L388), let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actual fetching of data from object storage, save for requesting the necessary byte ranges from the reader implementation, limited by any projections, filters, or row group selections pushed down by the query engine.
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead and AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actually fetching data from object storage
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1027848111


   Thanks again @tustvold  πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yjshen commented on pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1010574488


   Exciting news, thanks @tustvold 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1154: POC: Async parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r782415052



##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for interacting with parquet files
+
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream<T> {
+    metadata: Arc<ParquetMetaData>,
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    /// The next row group to read
+    next_row_group_idx: usize,
+
+    /// This is an option so it can be moved into a future
+    input: Option<T>,
+
+    state: StreamState<T>,
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchStream")
+            .field("metadata", &self.metadata)
+            .field("schema", &self.schema)
+            .finish()

Review comment:
       a version of the current state is probably also valuable

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for interacting with parquet files
+
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream<T> {
+    metadata: Arc<ParquetMetaData>,
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    /// The next row group to read
+    next_row_group_idx: usize,
+
+    /// This is an option so it can be moved into a future
+    input: Option<T>,
+
+    state: StreamState<T>,
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchStream")
+            .field("metadata", &self.metadata)
+            .field("schema", &self.schema)
+            .finish()
+    }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStream<T> {
+    /// Create a new [`ParquetRecordBatchStream`] from the provided [`File`]

Review comment:
       ```suggestion
       /// Create a new [`ParquetRecordBatchStream`] from the provided async input source
   ```

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for interacting with parquet files

Review comment:
       ```suggestion
   //! Contains asynchronous APIs for reading with parquet files into Arrow `RecordBatch`es
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: POC: Async parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r782005696



##########
File path: parquet/src/file/footer.rs
##########
@@ -78,7 +78,6 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
 
     // build up the reader covering the entire metadata
     let mut default_end_cursor = Cursor::new(default_len_end_buf);
-    let metadata_read: Box<dyn Read>;

Review comment:
       Drive by cleanup - this dynamic dispatch isn't necessary any more




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb merged pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1009835577


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1154](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (80c1978) into [master](https://codecov.io/gh/apache/arrow-rs/commit/aa71aeaa3d6f0345690490588640226701e1ac15?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa71aea) will **decrease** coverage by `0.06%`.
   > The diff coverage is `21.93%`.
   
   > :exclamation: Current head 80c1978 differs from pull request most recent head 38e2225. Consider uploading reports for the commit 38e2225 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1154/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1154      +/-   ##
   ==========================================
   - Coverage   82.96%   82.90%   -0.07%     
   ==========================================
     Files         178      180       +2     
     Lines       51522    51969     +447     
   ==========================================
   + Hits        42744    43083     +339     
   - Misses       8778     8886     +108     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/async\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXN5bmNfcmVhZGVyLnJz) | `0.00% <0.00%> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `92.42% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/compression.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29tcHJlc3Npb24ucnM=) | `88.59% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/encodings/decoding.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZW5jb2RpbmdzL2RlY29kaW5nLnJz) | `90.45% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9yZWFkZXIucnM=) | `75.47% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/serialized\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9zZXJpYWxpemVkX3JlYWRlci5ycw==) | `94.37% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `88.88% <ΓΈ> (ΓΈ)` | |
   | ... and [11 more](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aa71aea...38e2225](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] zeevm commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
zeevm commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030627167


   I see a few issues with this.
   
   First, the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   Second, a major premise of Parquet is "read only what you need", where what you need is usually dictated by some query engine, so continuously downloading in the background for data the client may doesn't even want or need doesn't seem right, especially as the cost is complicating all existing client by the added "Send" constraint. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r795044139



##########
File path: parquet/Cargo.toml
##########
@@ -55,24 +57,26 @@ brotli = "3.3"
 flate2 = "1.0"
 lz4 = "1.23"
 serde_json = { version = "1.0", features = ["preserve_order"] }
-arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
+arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }
 
 [features]
 default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
 cli = ["serde_json", "base64", "clap"]
 test_common = []
 # Experimental, unstable functionality primarily used for testing
 experimental = []
+# Experimental, unstable, async API

Review comment:
       ```suggestion
   # Enable async API
   ```

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder<T> {
+    input: T,
+
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    row_groups: Option<Vec<usize>>,
+
+    projection: Option<Vec<usize>>,
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+    pub async fn new(mut input: T) -> Result<Self> {
+        let metadata = Arc::new(read_footer(&mut input).await?);
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            metadata.file_metadata().key_value_metadata(),
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Build a new [`ParquetRecordBatchStream`]
+    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
+        let num_columns = self.schema.fields().len();
+        let num_row_groups = self.metadata.row_groups().len();
+
+        let columns = match self.projection {
+            Some(projection) => {
+                if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+                    return Err(general_err!(
+                        "column projection {} outside bounds of schema 0..{}",
+                        col,
+                        num_columns
+                    ));
+                }
+                projection
+            }
+            None => (0..num_columns).collect::<Vec<_>>(),
+        };
+
+        let row_groups = match self.row_groups {
+            Some(row_groups) => {
+                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+                    return Err(general_err!(
+                        "row group {} out of bounds 0..{}",
+                        col,
+                        num_row_groups
+                    ));
+                }
+                row_groups.into()
+            }
+            None => (0..self.metadata.row_groups().len()).collect(),
+        };
+
+        Ok(ParquetRecordBatchStream {
+            row_groups,
+            columns: columns.into(),
+            batch_size: self.batch_size,
+            metadata: self.metadata,
+            schema: self.schema,
+            input: Some(self.input),
+            state: StreamState::Init,
+        })
+    }
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for StreamState<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            StreamState::Init => write!(f, "StreamState::Init"),
+            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
+            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
+            StreamState::Error => write!(f, "StreamState::Error"),
+        }
+    }
+}
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file

Review comment:
       ```suggestion
   /// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file
   ```

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder<T> {
+    input: T,
+
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    row_groups: Option<Vec<usize>>,
+
+    projection: Option<Vec<usize>>,
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+    pub async fn new(mut input: T) -> Result<Self> {
+        let metadata = Arc::new(read_footer(&mut input).await?);
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            metadata.file_metadata().key_value_metadata(),
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Build a new [`ParquetRecordBatchStream`]
+    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
+        let num_columns = self.schema.fields().len();
+        let num_row_groups = self.metadata.row_groups().len();
+
+        let columns = match self.projection {
+            Some(projection) => {
+                if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+                    return Err(general_err!(
+                        "column projection {} outside bounds of schema 0..{}",
+                        col,
+                        num_columns
+                    ));
+                }
+                projection
+            }
+            None => (0..num_columns).collect::<Vec<_>>(),
+        };
+
+        let row_groups = match self.row_groups {
+            Some(row_groups) => {
+                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+                    return Err(general_err!(
+                        "row group {} out of bounds 0..{}",
+                        col,
+                        num_row_groups
+                    ));
+                }
+                row_groups.into()
+            }
+            None => (0..self.metadata.row_groups().len()).collect(),
+        };
+
+        Ok(ParquetRecordBatchStream {
+            row_groups,
+            columns: columns.into(),
+            batch_size: self.batch_size,
+            metadata: self.metadata,
+            schema: self.schema,
+            input: Some(self.input),
+            state: StreamState::Init,
+        })
+    }
+}
+
+enum StreamState<T> {
+    Init,

Review comment:
       ```suggestion
       /// At the start of a new row group, or the end of the parquet stream
       Init,
   ```

##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
 rand = "0.8"
+futures = { version = "0.3", optional = true }

Review comment:
       πŸ‘  for optional feature

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder<T> {
+    input: T,
+
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    row_groups: Option<Vec<usize>>,
+
+    projection: Option<Vec<usize>>,
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+    pub async fn new(mut input: T) -> Result<Self> {
+        let metadata = Arc::new(read_footer(&mut input).await?);
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            metadata.file_metadata().key_value_metadata(),
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Build a new [`ParquetRecordBatchStream`]
+    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
+        let num_columns = self.schema.fields().len();
+        let num_row_groups = self.metadata.row_groups().len();
+
+        let columns = match self.projection {
+            Some(projection) => {
+                if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+                    return Err(general_err!(
+                        "column projection {} outside bounds of schema 0..{}",
+                        col,
+                        num_columns
+                    ));
+                }
+                projection
+            }
+            None => (0..num_columns).collect::<Vec<_>>(),
+        };
+
+        let row_groups = match self.row_groups {
+            Some(row_groups) => {
+                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+                    return Err(general_err!(
+                        "row group {} out of bounds 0..{}",
+                        col,
+                        num_row_groups
+                    ));
+                }
+                row_groups.into()
+            }
+            None => (0..self.metadata.row_groups().len()).collect(),
+        };
+
+        Ok(ParquetRecordBatchStream {
+            row_groups,
+            columns: columns.into(),
+            batch_size: self.batch_size,
+            metadata: self.metadata,
+            schema: self.schema,
+            input: Some(self.input),
+            state: StreamState::Init,
+        })
+    }
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for StreamState<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            StreamState::Init => write!(f, "StreamState::Init"),
+            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
+            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
+            StreamState::Error => write!(f, "StreamState::Error"),
+        }
+    }
+}
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    columns: Arc<[usize]>,
+
+    row_groups: VecDeque<usize>,
+
+    /// This is an option so it can be moved into a future
+    input: Option<T>,
+
+    state: StreamState<T>,
+}
+
+impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchStream")
+            .field("metadata", &self.metadata)
+            .field("schema", &self.schema)
+            .field("batch_size", &self.batch_size)
+            .field("columns", &self.columns)
+            .field("state", &self.state)
+            .finish()
+    }
+}
+
+impl<T> ParquetRecordBatchStream<T> {
+    /// Returns the [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
+    for ParquetRecordBatchStream<T>
+{
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            match &mut self.state {
+                StreamState::Decoding(batch_reader) => match batch_reader.next() {
+                    Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))),
+                    Some(Err(e)) => {
+                        self.state = StreamState::Error;
+                        return Poll::Ready(Some(Err(ParquetError::ArrowError(
+                            e.to_string(),
+                        ))));
+                    }
+                    None => self.state = StreamState::Init,
+                },
+                StreamState::Init => {
+                    let row_group_idx = match self.row_groups.pop_front() {
+                        Some(idx) => idx,
+                        None => return Poll::Ready(None),
+                    };
+
+                    let metadata = self.metadata.clone();
+                    let mut input = match self.input.take() {
+                        Some(input) => input,
+                        None => {
+                            self.state = StreamState::Error;
+                            return Poll::Ready(Some(Err(general_err!(
+                                "input stream lost"
+                            ))));
+                        }
+                    };
+
+                    let columns = Arc::clone(&self.columns);
+
+                    self.state = StreamState::Reading(
+                        async move {
+                            let row_group_metadata = metadata.row_group(row_group_idx);
+                            let mut column_chunks =
+                                vec![None; row_group_metadata.columns().len()];
+
+                            for column_idx in columns.iter() {
+                                let column = row_group_metadata.column(*column_idx);
+                                let (start, length) = column.byte_range();
+                                let end = start + length;
+
+                                input.seek(SeekFrom::Start(start)).await?;
+
+                                let mut buffer = vec![0_u8; (end - start) as usize];
+                                input.read_exact(buffer.as_mut_slice()).await?;
+
+                                column_chunks[*column_idx] = Some(InMemoryColumnChunk {
+                                    num_values: column.num_values(),
+                                    compression: column.compression(),
+                                    physical_type: column.column_type(),
+                                    data: ByteBufferPtr::new(buffer),
+                                });
+                            }
+
+                            Ok((
+                                input,
+                                InMemoryRowGroup {
+                                    schema: metadata.file_metadata().schema_descr_ptr(),
+                                    column_chunks,
+                                },
+                            ))
+                        }
+                        .boxed(),
+                    )
+                }
+                StreamState::Reading(f) => {
+                    let result = futures::ready!(f.poll_unpin(cx));
+                    self.state = StreamState::Init;
+
+                    let row_group: Box<dyn RowGroupCollection> = match result {
+                        Ok((input, row_group)) => {
+                            self.input = Some(input);
+                            Box::new(row_group)
+                        }
+                        Err(e) => {
+                            self.state = StreamState::Error;
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                    };
+
+                    let parquet_schema = self.metadata.file_metadata().schema_descr_ptr();
+
+                    let array_reader = build_array_reader(
+                        parquet_schema,
+                        self.schema.clone(),
+                        self.columns.iter().cloned(),
+                        row_group,
+                    )?;
+
+                    let batch_reader =
+                        ParquetRecordBatchReader::try_new(self.batch_size, array_reader)
+                            .expect("reader");
+
+                    self.state = StreamState::Decoding(batch_reader)
+                }
+                StreamState::Error => return Poll::Pending,
+            }
+        }
+    }
+}
+
+async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
+    input: &mut T,
+) -> Result<ParquetMetaData> {
+    input.seek(SeekFrom::End(-8)).await?;
+
+    let mut buf = [0_u8; 8];
+    input.read_exact(&mut buf).await?;
+
+    if buf[4..] != PARQUET_MAGIC {
+        return Err(general_err!("Invalid Parquet file. Corrupt footer"));
+    }
+
+    let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
+    if metadata_len < 0 {
+        return Err(general_err!(
+            "Invalid Parquet file. Metadata length is less than zero ({})",
+            metadata_len
+        ));
+    }
+
+    input.seek(SeekFrom::End(-8 - metadata_len)).await?;
+
+    let mut buf = Vec::with_capacity(metadata_len as usize + 8);
+    input.read_to_end(&mut buf).await?;
+
+    parse_metadata_buffer(&mut Cursor::new(buf))
+}
+
+struct InMemoryRowGroup {
+    schema: SchemaDescPtr,
+    column_chunks: Vec<Option<InMemoryColumnChunk>>,
+}
+
+impl RowGroupCollection for InMemoryRowGroup {
+    fn schema(&self) -> Result<SchemaDescPtr> {
+        Ok(self.schema.clone())
+    }
+
+    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
+        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
+
+        Ok(Box::new(ColumnChunkIterator {
+            schema: self.schema.clone(),
+            column_schema: self.schema.columns()[i].clone(),
+            reader: Some(page_reader),
+        }))
+    }
+}
+
+#[derive(Clone)]
+struct InMemoryColumnChunk {
+    num_values: i64,
+    compression: Compression,
+    physical_type: crate::basic::Type,
+    data: ByteBufferPtr,
+}
+
+impl InMemoryColumnChunk {
+    fn pages(&self) -> Result<Box<dyn PageReader>> {
+        let page_reader = SerializedPageReader::new(
+            Cursor::new(self.data.clone()),
+            self.num_values,
+            self.compression,
+            self.physical_type,
+        )?;
+
+        Ok(Box::new(page_reader))
+    }
+}
+
+struct ColumnChunkIterator {
+    schema: SchemaDescPtr,
+    column_schema: ColumnDescPtr,
+    reader: Option<Result<Box<dyn PageReader>>>,
+}
+
+impl Iterator for ColumnChunkIterator {
+    type Item = Result<Box<dyn PageReader>>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.reader.take()
+    }
+}
+
+impl PageIterator for ColumnChunkIterator {
+    fn schema(&mut self) -> Result<SchemaDescPtr> {
+        Ok(self.schema.clone())
+    }
+
+    fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+        Ok(self.column_schema.clone())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::util::pretty::pretty_format_batches;
+    use futures::TryStreamExt;
+    use tokio::fs::File;
+
+    use super::*;
+
+    fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
+        let formatted = pretty_format_batches(batches).unwrap().to_string();
+        let actual_lines: Vec<_> = formatted.trim().lines().collect();
+        assert_eq!(
+            &actual_lines, expected_lines,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected_lines, actual_lines
+        );
+    }
+
+    #[tokio::test]
+    async fn test_parquet_stream() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_plain.parquet", testdata);
+        let file = File::open(path).await.unwrap();
+
+        let builder = ParquetRecordBatchStreamBuilder::new(file)
+            .await
+            .unwrap()
+            .with_projection(vec![1, 2, 6])
+            .with_batch_size(3);
+
+        let stream = builder.build().unwrap();
+
+        let results = stream.try_collect::<Vec<_>>().await.unwrap();
+        assert_eq!(results.len(), 3);
+
+        assert_batches_eq(
+            &results,
+            &[
+                "+----------+-------------+-----------+",
+                "| bool_col | tinyint_col | float_col |",
+                "+----------+-------------+-----------+",
+                "| true     | 0           | 0         |",
+                "| false    | 1           | 1.1       |",
+                "| true     | 0           | 0         |",
+                "| false    | 1           | 1.1       |",
+                "| true     | 0           | 0         |",
+                "| false    | 1           | 1.1       |",
+                "| true     | 0           | 0         |",
+                "| false    | 1           | 1.1       |",
+                "+----------+-------------+-----------+",
+            ],
+        );
+    }

Review comment:
       Other tests that might be cool
   1. error cases (where projection is out of bounds, row group out of bounds).
   2. row group filter (as in read a multi-row group parquet file but only read one of the row groups)
   

##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder<T> {
+    input: T,
+
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    row_groups: Option<Vec<usize>>,
+
+    projection: Option<Vec<usize>>,
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+    pub async fn new(mut input: T) -> Result<Self> {
+        let metadata = Arc::new(read_footer(&mut input).await?);
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            metadata.file_metadata().key_value_metadata(),
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Build a new [`ParquetRecordBatchStream`]
+    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
+        let num_columns = self.schema.fields().len();
+        let num_row_groups = self.metadata.row_groups().len();
+
+        let columns = match self.projection {
+            Some(projection) => {
+                if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+                    return Err(general_err!(
+                        "column projection {} outside bounds of schema 0..{}",
+                        col,
+                        num_columns
+                    ));
+                }
+                projection
+            }
+            None => (0..num_columns).collect::<Vec<_>>(),
+        };
+
+        let row_groups = match self.row_groups {
+            Some(row_groups) => {
+                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+                    return Err(general_err!(
+                        "row group {} out of bounds 0..{}",
+                        col,
+                        num_row_groups
+                    ));
+                }
+                row_groups.into()
+            }
+            None => (0..self.metadata.row_groups().len()).collect(),
+        };
+
+        Ok(ParquetRecordBatchStream {
+            row_groups,
+            columns: columns.into(),
+            batch_size: self.batch_size,
+            metadata: self.metadata,
+            schema: self.schema,
+            input: Some(self.input),
+            state: StreamState::Init,
+        })
+    }
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for StreamState<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            StreamState::Init => write!(f, "StreamState::Init"),
+            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
+            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
+            StreamState::Error => write!(f, "StreamState::Error"),
+        }
+    }
+}
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    columns: Arc<[usize]>,
+
+    row_groups: VecDeque<usize>,
+
+    /// This is an option so it can be moved into a future
+    input: Option<T>,
+
+    state: StreamState<T>,
+}
+
+impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchStream")
+            .field("metadata", &self.metadata)
+            .field("schema", &self.schema)
+            .field("batch_size", &self.batch_size)
+            .field("columns", &self.columns)
+            .field("state", &self.state)
+            .finish()
+    }
+}
+
+impl<T> ParquetRecordBatchStream<T> {
+    /// Returns the [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
+    for ParquetRecordBatchStream<T>
+{
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            match &mut self.state {
+                StreamState::Decoding(batch_reader) => match batch_reader.next() {
+                    Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))),
+                    Some(Err(e)) => {
+                        self.state = StreamState::Error;
+                        return Poll::Ready(Some(Err(ParquetError::ArrowError(
+                            e.to_string(),
+                        ))));
+                    }
+                    None => self.state = StreamState::Init,
+                },
+                StreamState::Init => {
+                    let row_group_idx = match self.row_groups.pop_front() {
+                        Some(idx) => idx,
+                        None => return Poll::Ready(None),
+                    };
+
+                    let metadata = self.metadata.clone();
+                    let mut input = match self.input.take() {
+                        Some(input) => input,
+                        None => {
+                            self.state = StreamState::Error;
+                            return Poll::Ready(Some(Err(general_err!(
+                                "input stream lost"
+                            ))));
+                        }
+                    };
+
+                    let columns = Arc::clone(&self.columns);
+
+                    self.state = StreamState::Reading(
+                        async move {
+                            let row_group_metadata = metadata.row_group(row_group_idx);
+                            let mut column_chunks =
+                                vec![None; row_group_metadata.columns().len()];
+
+                            for column_idx in columns.iter() {

Review comment:
       my reading of this code is that it reads / decodes the data one column at a time (aka does not overlap IO and CPU). This is fine, I just want to confirm my understanding




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1026869178


   Added an example in  https://github.com/apache/arrow-rs/pull/1253 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1154: Async arrow parquet reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1009835577


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1154](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (78f71ab) into [master](https://codecov.io/gh/apache/arrow-rs/commit/aa71aeaa3d6f0345690490588640226701e1ac15?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa71aea) will **decrease** coverage by `0.06%`.
   > The diff coverage is `8.92%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1154/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1154      +/-   ##
   ==========================================
   - Coverage   82.96%   82.89%   -0.07%     
   ==========================================
     Files         178      180       +2     
     Lines       51522    51944     +422     
   ==========================================
   + Hits        42744    43060     +316     
   - Misses       8778     8884     +106     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/async\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXN5bmNfcmVhZGVyLnJz) | `0.00% <0.00%> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `92.42% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/compression.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29tcHJlc3Npb24ucnM=) | `88.59% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/encodings/decoding.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZW5jb2RpbmdzL2RlY29kaW5nLnJz) | `90.45% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9yZWFkZXIucnM=) | `75.47% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/serialized\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9zZXJpYWxpemVkX3JlYWRlci5ycw==) | `94.37% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `88.88% <ΓΈ> (ΓΈ)` | |
   | ... and [9 more](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aa71aea...78f71ab](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r795049009



##########
File path: parquet/src/arrow/async_reader.rs
##########
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder<T> {
+    input: T,
+
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    row_groups: Option<Vec<usize>>,
+
+    projection: Option<Vec<usize>>,
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+    pub async fn new(mut input: T) -> Result<Self> {
+        let metadata = Arc::new(read_footer(&mut input).await?);
+
+        let schema = Arc::new(parquet_to_arrow_schema(
+            metadata.file_metadata().schema_descr(),
+            metadata.file_metadata().key_value_metadata(),
+        )?);
+
+        Ok(Self {
+            input,
+            metadata,
+            schema,
+            batch_size: 1024,
+            row_groups: None,
+            projection: None,
+        })
+    }
+
+    /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+    pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.metadata
+    }
+
+    /// Returns the arrow [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Set the size of [`RecordBatch`] to produce
+    pub fn with_batch_size(self, batch_size: usize) -> Self {
+        Self { batch_size, ..self }
+    }
+
+    /// Only read data from the provided row group indexes
+    pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
+        Self {
+            row_groups: Some(row_groups),
+            ..self
+        }
+    }
+
+    /// Only read data from the provided column indexes
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Build a new [`ParquetRecordBatchStream`]
+    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
+        let num_columns = self.schema.fields().len();
+        let num_row_groups = self.metadata.row_groups().len();
+
+        let columns = match self.projection {
+            Some(projection) => {
+                if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+                    return Err(general_err!(
+                        "column projection {} outside bounds of schema 0..{}",
+                        col,
+                        num_columns
+                    ));
+                }
+                projection
+            }
+            None => (0..num_columns).collect::<Vec<_>>(),
+        };
+
+        let row_groups = match self.row_groups {
+            Some(row_groups) => {
+                if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+                    return Err(general_err!(
+                        "row group {} out of bounds 0..{}",
+                        col,
+                        num_row_groups
+                    ));
+                }
+                row_groups.into()
+            }
+            None => (0..self.metadata.row_groups().len()).collect(),
+        };
+
+        Ok(ParquetRecordBatchStream {
+            row_groups,
+            columns: columns.into(),
+            batch_size: self.batch_size,
+            metadata: self.metadata,
+            schema: self.schema,
+            input: Some(self.input),
+            state: StreamState::Init,
+        })
+    }
+}
+
+enum StreamState<T> {
+    Init,
+    /// Decoding a batch
+    Decoding(ParquetRecordBatchReader),
+    /// Reading data from input
+    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    /// Error
+    Error,
+}
+
+impl<T> std::fmt::Debug for StreamState<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            StreamState::Init => write!(f, "StreamState::Init"),
+            StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
+            StreamState::Reading(_) => write!(f, "StreamState::Reading"),
+            StreamState::Error => write!(f, "StreamState::Error"),
+        }
+    }
+}
+
+/// A [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    batch_size: usize,
+
+    columns: Arc<[usize]>,
+
+    row_groups: VecDeque<usize>,
+
+    /// This is an option so it can be moved into a future
+    input: Option<T>,
+
+    state: StreamState<T>,
+}
+
+impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetRecordBatchStream")
+            .field("metadata", &self.metadata)
+            .field("schema", &self.schema)
+            .field("batch_size", &self.batch_size)
+            .field("columns", &self.columns)
+            .field("state", &self.state)
+            .finish()
+    }
+}
+
+impl<T> ParquetRecordBatchStream<T> {
+    /// Returns the [`SchemaRef`] for this parquet file
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+}
+
+impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
+    for ParquetRecordBatchStream<T>
+{
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            match &mut self.state {
+                StreamState::Decoding(batch_reader) => match batch_reader.next() {
+                    Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))),
+                    Some(Err(e)) => {
+                        self.state = StreamState::Error;
+                        return Poll::Ready(Some(Err(ParquetError::ArrowError(
+                            e.to_string(),
+                        ))));
+                    }
+                    None => self.state = StreamState::Init,
+                },
+                StreamState::Init => {
+                    let row_group_idx = match self.row_groups.pop_front() {
+                        Some(idx) => idx,
+                        None => return Poll::Ready(None),
+                    };
+
+                    let metadata = self.metadata.clone();
+                    let mut input = match self.input.take() {
+                        Some(input) => input,
+                        None => {
+                            self.state = StreamState::Error;
+                            return Poll::Ready(Some(Err(general_err!(
+                                "input stream lost"
+                            ))));
+                        }
+                    };
+
+                    let columns = Arc::clone(&self.columns);
+
+                    self.state = StreamState::Reading(
+                        async move {
+                            let row_group_metadata = metadata.row_group(row_group_idx);
+                            let mut column_chunks =
+                                vec![None; row_group_metadata.columns().len()];
+
+                            for column_idx in columns.iter() {

Review comment:
       Yes it will read the bytes for each column chunk sequentially. It is worth noting that the actual decompression / decode takes place lazily in `StreamState::Decoding` not here - this just fetches the raw bytes.
   
   This abstraction does theoretically allow for query engines to exploit parallelism at the granularity of the row group, but practically speaking I'd imagine most will just stick to a file and be done with it. For almost all workloads the number of parquet files will outnumber the core count anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter commented on pull request #1154: POC: Async parquet reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1009835577


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1154](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3992bcd) into [master](https://codecov.io/gh/apache/arrow-rs/commit/5302b92012e86c5c966378b1564027466a545430?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5302b92) will **decrease** coverage by `0.15%`.
   > The diff coverage is `23.18%`.
   
   > :exclamation: Current head 3992bcd differs from pull request most recent head 7eda456. Consider uploading reports for the commit 7eda456 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1154/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1154      +/-   ##
   ==========================================
   - Coverage   82.56%   82.40%   -0.16%     
   ==========================================
     Files         169      170       +1     
     Lines       50497    50614     +117     
   ==========================================
   + Hits        41691    41707      +16     
   - Misses       8806     8907     +101     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/async\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXN5bmNfcmVhZGVyLnJz) | `0.00% <0.00%> (ΓΈ)` | |
   | [parquet/src/file/footer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9mb290ZXIucnM=) | `95.37% <66.66%> (+0.04%)` | :arrow_up: |
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `78.04% <76.92%> (-0.22%)` | :arrow_down: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `79.54% <100.00%> (+0.29%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfcmVhZGVyLnJz) | `89.32% <100.00%> (ΓΈ)` | |
   | [arrow/src/datatypes/field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9maWVsZC5ycw==) | `54.10% <0.00%> (+0.30%)` | :arrow_up: |
   | [arrow/src/datatypes/datatype.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9kYXRhdHlwZS5ycw==) | `66.80% <0.00%> (+0.42%)` | :arrow_up: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `66.43% <0.00%> (+0.45%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5302b92...7eda456](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1031498813


   > Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are Send. Is there a particular one causing an issue, as we could potentially feature gate it behind the async feature flag?
   
   
   I am also interested in what issues (if any) adding the `Send` constraint has / will cause.
   
   @zeevm  if you have some time  and are willing to help make the `async` parquet reader more sophisticated in terms of reading only what is needed, we would love to welcome your contributions ❀️  -- we are only just beginning to improve in this area.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see [here](https://github.com/apache/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L388), let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actual fetching of data from object storage, save for requesting the necessary blocks of data from the reader implementation, limited by any projections, filters, or row group selections pushed down by the query engine.
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r788060131



##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -1,1922 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       This is slated for removal #1197 and so I didn't spend time working out how to make it Send, and just deleted it. Will split into a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: POC: Async parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r782006587



##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -100,6 +100,26 @@ pub trait ArrayReader {
     fn get_rep_levels(&self) -> Option<&[i16]>;
 }
 
+/// A collection of row groups
+pub trait RowGroupCollection {
+    /// Get schema of parquet file.
+    fn schema(&self) -> Result<SchemaDescPtr>;
+
+    /// Returns an iterator over the column chunks for particular column
+    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
+}
+
+impl RowGroupCollection for Arc<dyn FileReader> {

Review comment:
       This does mean we have double dynamic dispatch, given these methods are called a couple of times per-file I'm inclined to consider this largely irrelevant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r794926498



##########
File path: parquet/src/file/reader.rs
##########
@@ -43,8 +43,8 @@ pub trait Length {
 /// The ChunkReader trait generates readers of chunks of a source.
 /// For a file system reader, each chunk might contain a clone of File bounded on a given range.
 /// For an object store reader, each read can be mapped to a range request.
-pub trait ChunkReader: Length {
-    type T: Read;
+pub trait ChunkReader: Length + Send + Sync {

Review comment:
       These traits need to be both `Send + Sync` as they are used through immutable references, e.g. `Arc`

##########
File path: parquet/src/file/serialized_reader.rs
##########
@@ -271,15 +271,15 @@ impl<T: Read> SerializedPageReader<T> {
     }
 }
 
-impl<T: Read> Iterator for SerializedPageReader<T> {
+impl<T: Read + Send> Iterator for SerializedPageReader<T> {
     type Item = Result<Page>;
 
     fn next(&mut self) -> Option<Self::Item> {
         self.get_next_page().transpose()
     }
 }
 
-impl<T: Read> PageReader for SerializedPageReader<T> {
+impl<T: Read + Send> PageReader for SerializedPageReader<T> {

Review comment:
       As `PageReader: Send` it can only be implemented for types that are `Send` which is only the case for `SerializedPageReader<T>` if `T: Send`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] houqp commented on pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1010531670


   Pretty cool demo @tustvold :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, see [here](https://github.com/apache/arrow-rs/blob/master/parquet/src/file/serialized_reader.rs#L388), let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actually fetching data from object storage
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1030632686


   > the notion that the column chunk is the basic i/o unit for Parquet is somewhat outdates with the introduction of the index page.
   
   I agree, in so much as whatever mechanism we eventually add for more granular filter pushdown, be it the page index or something more sophisticated such as described in #1191, I would anticipate using to refine the data `ParquetRecordBatchStream` fetches prior to decode. That being said, currently this crate doesn't even support decoding the index pages, let alone doing anything with them :sweat_smile: 
   
   > so continuously downloading in the background for data the client
   
   This PR does not add functionality for doing this, it adds hooks for a query engine to use for doing this by providing something implementing `AsyncRead` and `AsyncSeek`. This has been a frequent ask within Datafusion and https://github.com/apache/arrow-datafusion/pull/1617 begins to flesh out how this might look. The parquet crate would not have anything to do with the actually fetching data from object storage
   
   > complicating all existing client by the added "Send" constraint.
   
   Are these additions this causing an issue for you? I have to confess I did not anticipate this causing issues, as almost all types are `Send`. Is there a particular one causing an issue, as we could potentially feature gate it behind the `async` feature flag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1024732701


   With https://github.com/apache/arrow-datafusion/pull/1617 I'm happy with the interface, so I'm marking this ready for review. 
   
   I'll work on getting some better coverage, e.g. the fuzz tests, over the coming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1154: POC: `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1009835577


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1154](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (819913a) into [master](https://codecov.io/gh/apache/arrow-rs/commit/aa71aeaa3d6f0345690490588640226701e1ac15?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa71aea) will **decrease** coverage by `0.07%`.
   > The diff coverage is `7.83%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1154/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1154      +/-   ##
   ==========================================
   - Coverage   82.96%   82.88%   -0.08%     
   ==========================================
     Files         178      180       +2     
     Lines       51522    51944     +422     
   ==========================================
   + Hits        42744    43056     +312     
   - Misses       8778     8888     +110     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/async\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXN5bmNfcmVhZGVyLnJz) | `0.00% <0.00%> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `92.42% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/compression.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29tcHJlc3Npb24ucnM=) | `88.59% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/encodings/decoding.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZW5jb2RpbmdzL2RlY29kaW5nLnJz) | `90.45% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9yZWFkZXIucnM=) | `75.47% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/file/serialized\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZmlsZS9zZXJpYWxpemVkX3JlYWRlci5ycw==) | `94.37% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `88.88% <ΓΈ> (ΓΈ)` | |
   | ... and [10 more](https://codecov.io/gh/apache/arrow-rs/pull/1154/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aa71aea...819913a](https://codecov.io/gh/apache/arrow-rs/pull/1154?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1154: Async arrow parquet reader

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#discussion_r794970546



##########
File path: arrow/src/util/pretty.rs
##########
@@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
             let mut cells = Vec::new();
             for col in 0..batch.num_columns() {
                 let column = batch.column(col);
-                cells.push(Cell::new(&array_value_to_string(&column, row)?));
+                cells.push(Cell::new(&array_value_to_string(column, row)?));

Review comment:
       Activating pretty_print in parquet appears to have made clippy find a load of new stuff in arrow :sweat_smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1154: Add `async` arrow parquet reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1154:
URL: https://github.com/apache/arrow-rs/pull/1154#issuecomment-1029291168


   tracking API change in https://github.com/apache/arrow-rs/issues/1264 (for changelog)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org