You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/11 13:12:28 UTC

[arrow-rs] branch master updated: Document Async decoder usage (#4043) (#78) (#4046)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 884ab4edf Document Async decoder usage (#4043) (#78) (#4046)
884ab4edf is described below

commit 884ab4edf2c89d527e3408de1661bb1555640e8b
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 11 14:12:22 2023 +0100

    Document Async decoder usage (#4043) (#78) (#4046)
    
    * Document Async decoder usage (#4043) (#78)
    
    * Review feedback
    
    * Review feedback
---
 arrow-csv/Cargo.toml        |   3 ++
 arrow-csv/src/reader/mod.rs |  83 ++++++++++++++++++++++++++++++++++
 arrow-json/Cargo.toml       |   3 ++
 arrow-json/src/raw/mod.rs   | 108 ++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 197 insertions(+)

diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
index d4526ba32..1f1a762d5 100644
--- a/arrow-csv/Cargo.toml
+++ b/arrow-csv/Cargo.toml
@@ -48,3 +48,6 @@ regex = { version = "1.7.0", default-features = false, features = ["std", "unico
 
 [dev-dependencies]
 tempfile = "3.3"
+futures = "0.3"
+tokio = { version = "1.27", default-features = false, features = ["io-util"] }
+bytes = "1.4"
diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 3fa712819..5bfcbc645 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -17,6 +17,8 @@
 
 //! CSV Reader
 //!
+//! # Basic Usage
+//!
 //! This CSV reader allows CSV files to be read into the Arrow memory model. Records are
 //! loaded in batches and are then converted from row-based data to columnar data.
 //!
@@ -39,6 +41,87 @@
 //! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
+//! and is designed to be agnostic to the various different kinds of async IO primitives found
+//! within the Rust ecosystem.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//!     mut decoder: Decoder,
+//!     mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     let mut buffered = Bytes::new();
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             if buffered.is_empty() {
+//!                 if let Some(b) = ready!(input.poll_next_unpin(cx)) {
+//!                     buffered = b;
+//!                 }
+//!                 // Note: don't break on `None` as the decoder needs
+//!                 // to be called with an empty array to delimit the
+//!                 // final record
+//!             }
+//!             let decoded = match decoder.decode(buffered.as_ref()) {
+//!                 Ok(0) => break,
+//!                 Ok(decoded) => decoded,
+//!                 Err(e) => return Poll::Ready(Some(Err(e))),
+//!             };
+//!             buffered.advance(decoded);
+//!         }
+//!
+//!         Poll::Ready(decoder.flush().transpose())
+//!     })
+//! }
+//!
+//! ```
+//!
+//! In a similar vein, it can also be used with tokio-based IO primitives
+//!
+//! ```
+//! # use std::pin::Pin;
+//! # use std::task::{Poll, ready};
+//! # use futures::Stream;
+//! # use tokio::io::AsyncBufRead;
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! # use arrow_schema::ArrowError;
+//! fn decode_stream<R: AsyncBufRead + Unpin>(
+//!     mut decoder: Decoder,
+//!     mut reader: R,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
+//!                 Ok(b) => b,
+//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
+//!             };
+//!             let decoded = match decoder.decode(b) {
+//!                 // Note: the decoder needs to be called with an empty
+//!                 // array to delimit the final record
+//!                 Ok(0) => break,
+//!                 Ok(decoded) => decoded,
+//!                 Err(e) => return Poll::Ready(Some(Err(e))),
+//!             };
+//!             Pin::new(&mut reader).consume(decoded);
+//!         }
+//!
+//!         Poll::Ready(decoder.flush().transpose())
+//!     })
+//! }
+//! ```
+//!
 
 mod records;
 
diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 453e4aa35..d9b3a0df9 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -51,3 +51,6 @@ lexical-core = { version = "0.8", default-features = false }
 tempfile = "3.3"
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 serde = { version = "1.0", default-features = false, features = ["derive"] }
+futures = "0.3"
+tokio = { version = "1.27", default-features = false, features = ["io-util"] }
+bytes = "1.4"
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs
index f1f1ffb77..c19552476 100644
--- a/arrow-json/src/raw/mod.rs
+++ b/arrow-json/src/raw/mod.rs
@@ -18,6 +18,114 @@
 //! A faster JSON reader that will eventually replace [`Reader`]
 //!
 //! [`Reader`]: crate::reader::Reader
+//!
+//! # Basic Usage
+//!
+//! [`RawReader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
+//!
+//! ```
+//! # use arrow_schema::*;
+//! # use std::fs::File;
+//! # use std::io::BufReader;
+//! # use std::sync::Arc;
+//!
+//! let schema = Arc::new(Schema::new(vec![
+//!     Field::new("a", DataType::Float64, false),
+//!     Field::new("b", DataType::Float64, false),
+//!     Field::new("c", DataType::Boolean, true),
+//! ]));
+//!
+//! let file = File::open("test/data/basic.json").unwrap();
+//!
+//! let mut json = arrow_json::RawReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
+//! let batch = json.next().unwrap().unwrap();
+//! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`RawDecoder`] can be integrated with various forms of async data streams,
+//! and is designed to be agnostic to the various different kinds of async IO primitives found
+//! within the Rust ecosystem.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_json::RawDecoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//!     mut decoder: RawDecoder,
+//!     mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     let mut buffered = Bytes::new();
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             if buffered.is_empty() {
+//!                 buffered = match ready!(input.poll_next_unpin(cx)) {
+//!                     Some(b) => b,
+//!                     None => break,
+//!                 };
+//!             }
+//!             let decoded = match decoder.decode(buffered.as_ref()) {
+//!                 Ok(decoded) => decoded,
+//!                 Err(e) => return Poll::Ready(Some(Err(e))),
+//!             };
+//!             let read = buffered.len();
+//!             buffered.advance(decoded);
+//!             if decoded != read {
+//!                 break
+//!             }
+//!         }
+//!
+//!         Poll::Ready(decoder.flush().transpose())
+//!     })
+//! }
+//!
+//! ```
+//!
+//! In a similar vein, it can also be used with tokio-based IO primitives
+//!
+//! ```
+//! # use std::sync::Arc;
+//! # use arrow_schema::{DataType, Field, Schema};
+//! # use std::pin::Pin;
+//! # use std::task::{Poll, ready};
+//! # use futures::{Stream, TryStreamExt};
+//! # use tokio::io::AsyncBufRead;
+//! # use arrow_array::RecordBatch;
+//! # use arrow_json::RawDecoder;
+//! # use arrow_schema::ArrowError;
+//! fn decode_stream<R: AsyncBufRead + Unpin>(
+//!     mut decoder: RawDecoder,
+//!     mut reader: R,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//!     futures::stream::poll_fn(move |cx| {
+//!         loop {
+//!             let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
+//!                 Ok(b) if b.is_empty() => break,
+//!                 Ok(b) => b,
+//!                 Err(e) => return Poll::Ready(Some(Err(e.into()))),
+//!             };
+//!             let read = b.len();
+//!             let decoded = match decoder.decode(b) {
+//!                 Ok(decoded) => decoded,
+//!                 Err(e) => return Poll::Ready(Some(Err(e))),
+//!             };
+//!             Pin::new(&mut reader).consume(decoded);
+//!             if decoded != read {
+//!                 break;
+//!             }
+//!         }
+//!
+//!         Poll::Ready(decoder.flush().transpose())
+//!     })
+//! }
+//! ```
+//!
 
 use crate::raw::boolean_array::BooleanArrayDecoder;
 use crate::raw::decimal_array::DecimalArrayDecoder;