You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "crepererum (via GitHub)" <gi...@apache.org> on 2023/04/11 09:16:02 UTC

[GitHub] [arrow-rs] crepererum commented on a diff in pull request #4046: Document Async decoder usage (#4043) (#78)

crepererum commented on code in PR #4046:
URL: https://github.com/apache/arrow-rs/pull/4046#discussion_r1162528022


##########
arrow-csv/src/reader/mod.rs:
##########
@@ -39,6 +41,84 @@
 //! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`Decoder`] can be integrated with various forms of async data streams.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//!     mut decoder: Decoder,
+//!     mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     let mut buffered = Bytes::new();
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             if buffered.is_empty() {
+//!                 if let Some(b) = ready!(input.poll_next_unpin(cx)) {
+//!                     buffered = b;
+//!                 }
+//!             }
+//!             let decoded = match decoder.decode(buffered.as_ref()) {
+//!                 // Note: the decoder needs to be called with an empty
+//!                 // array to delimit the final record

Review Comment:
   Where is this done in this example? I would have expected that there's some `decoder.finish()` method instead of some sentinel value for this purpose (I've seen a similar API design w/ some compressor crates before).



##########
arrow-csv/src/reader/mod.rs:
##########
@@ -39,6 +41,84 @@
 //! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`Decoder`] can be integrated with various forms of async data streams.

Review Comment:
   This information should be part of the doc-string, not hidden in some PR comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org