You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/11 13:12:28 UTC
[arrow-rs] branch master updated: Document Async decoder usage (#4043) (#78) (#4046)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 884ab4edf Document Async decoder usage (#4043) (#78) (#4046)
884ab4edf is described below
commit 884ab4edf2c89d527e3408de1661bb1555640e8b
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 11 14:12:22 2023 +0100
Document Async decoder usage (#4043) (#78) (#4046)
* Document Async decoder usage (#4043) (#78)
* Review feedback
* Review feedback
---
arrow-csv/Cargo.toml | 3 ++
arrow-csv/src/reader/mod.rs | 83 ++++++++++++++++++++++++++++++++++
arrow-json/Cargo.toml | 3 ++
arrow-json/src/raw/mod.rs | 108 ++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 197 insertions(+)
diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml
index d4526ba32..1f1a762d5 100644
--- a/arrow-csv/Cargo.toml
+++ b/arrow-csv/Cargo.toml
@@ -48,3 +48,6 @@ regex = { version = "1.7.0", default-features = false, features = ["std", "unico
[dev-dependencies]
tempfile = "3.3"
+futures = "0.3"
+tokio = { version = "1.27", default-features = false, features = ["io-util"] }
+bytes = "1.4"
diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 3fa712819..5bfcbc645 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -17,6 +17,8 @@
//! CSV Reader
//!
+//! # Basic Usage
+//!
//! This CSV reader allows CSV files to be read into the Arrow memory model. Records are
//! loaded in batches and are then converted from row-based data to columnar data.
//!
@@ -39,6 +41,87 @@
//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
//! let batch = csv.next().unwrap().unwrap();
//! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`Decoder`] can be integrated with various forms of async data streams,
+//! and is designed to be agnostic to the various different kinds of async IO primitives found
+//! within the Rust ecosystem.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//! mut decoder: Decoder,
+//! mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//! let mut buffered = Bytes::new();
+//! futures::stream::poll_fn(move |cx| {
+//! loop {
+//! if buffered.is_empty() {
+//! if let Some(b) = ready!(input.poll_next_unpin(cx)) {
+//! buffered = b;
+//! }
+//! // Note: don't break on `None` as the decoder needs
+//! // to be called with an empty array to delimit the
+//! // final record
+//! }
+//! let decoded = match decoder.decode(buffered.as_ref()) {
+//! Ok(0) => break,
+//! Ok(decoded) => decoded,
+//! Err(e) => return Poll::Ready(Some(Err(e))),
+//! };
+//! buffered.advance(decoded);
+//! }
+//!
+//! Poll::Ready(decoder.flush().transpose())
+//! })
+//! }
+//!
+//! ```
+//!
+//! In a similar vein, it can also be used with tokio-based IO primitives
+//!
+//! ```
+//! # use std::pin::Pin;
+//! # use std::task::{Poll, ready};
+//! # use futures::Stream;
+//! # use tokio::io::AsyncBufRead;
+//! # use arrow_array::RecordBatch;
+//! # use arrow_csv::reader::Decoder;
+//! # use arrow_schema::ArrowError;
+//! fn decode_stream<R: AsyncBufRead + Unpin>(
+//! mut decoder: Decoder,
+//! mut reader: R,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//! futures::stream::poll_fn(move |cx| {
+//! loop {
+//! let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
+//! Ok(b) => b,
+//! Err(e) => return Poll::Ready(Some(Err(e.into()))),
+//! };
+//! let decoded = match decoder.decode(b) {
+//! // Note: the decoder needs to be called with an empty
+//! // array to delimit the final record
+//! Ok(0) => break,
+//! Ok(decoded) => decoded,
+//! Err(e) => return Poll::Ready(Some(Err(e))),
+//! };
+//! Pin::new(&mut reader).consume(decoded);
+//! }
+//!
+//! Poll::Ready(decoder.flush().transpose())
+//! })
+//! }
+//! ```
+//!
mod records;
diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml
index 453e4aa35..d9b3a0df9 100644
--- a/arrow-json/Cargo.toml
+++ b/arrow-json/Cargo.toml
@@ -51,3 +51,6 @@ lexical-core = { version = "0.8", default-features = false }
tempfile = "3.3"
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
+futures = "0.3"
+tokio = { version = "1.27", default-features = false, features = ["io-util"] }
+bytes = "1.4"
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs
index f1f1ffb77..c19552476 100644
--- a/arrow-json/src/raw/mod.rs
+++ b/arrow-json/src/raw/mod.rs
@@ -18,6 +18,114 @@
//! A faster JSON reader that will eventually replace [`Reader`]
//!
//! [`Reader`]: crate::reader::Reader
+//!
+//! # Basic Usage
+//!
+//! [`RawReader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
+//!
+//! ```
+//! # use arrow_schema::*;
+//! # use std::fs::File;
+//! # use std::io::BufReader;
+//! # use std::sync::Arc;
+//!
+//! let schema = Arc::new(Schema::new(vec![
+//! Field::new("a", DataType::Float64, false),
+//! Field::new("b", DataType::Float64, false),
+//! Field::new("c", DataType::Boolean, true),
+//! ]));
+//!
+//! let file = File::open("test/data/basic.json").unwrap();
+//!
+//! let mut json = arrow_json::RawReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
+//! let batch = json.next().unwrap().unwrap();
+//! ```
+//!
+//! # Async Usage
+//!
+//! The lower-level [`RawDecoder`] can be integrated with various forms of async data streams,
+//! and is designed to be agnostic to the various different kinds of async IO primitives found
+//! within the Rust ecosystem.
+//!
+//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
+//!
+//! ```
+//! # use std::task::{Poll, ready};
+//! # use bytes::{Buf, Bytes};
+//! # use arrow_schema::ArrowError;
+//! # use futures::stream::{Stream, StreamExt};
+//! # use arrow_array::RecordBatch;
+//! # use arrow_json::RawDecoder;
+//! #
+//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
+//! mut decoder: RawDecoder,
+//! mut input: S,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//! let mut buffered = Bytes::new();
+//! futures::stream::poll_fn(move |cx| {
+//! loop {
+//! if buffered.is_empty() {
+//! buffered = match ready!(input.poll_next_unpin(cx)) {
+//! Some(b) => b,
+//! None => break,
+//! };
+//! }
+//! let decoded = match decoder.decode(buffered.as_ref()) {
+//! Ok(decoded) => decoded,
+//! Err(e) => return Poll::Ready(Some(Err(e))),
+//! };
+//! let read = buffered.len();
+//! buffered.advance(decoded);
+//! if decoded != read {
+//! break
+//! }
+//! }
+//!
+//! Poll::Ready(decoder.flush().transpose())
+//! })
+//! }
+//!
+//! ```
+//!
+//! In a similar vein, it can also be used with tokio-based IO primitives
+//!
+//! ```
+//! # use std::sync::Arc;
+//! # use arrow_schema::{DataType, Field, Schema};
+//! # use std::pin::Pin;
+//! # use std::task::{Poll, ready};
+//! # use futures::{Stream, TryStreamExt};
+//! # use tokio::io::AsyncBufRead;
+//! # use arrow_array::RecordBatch;
+//! # use arrow_json::RawDecoder;
+//! # use arrow_schema::ArrowError;
+//! fn decode_stream<R: AsyncBufRead + Unpin>(
+//! mut decoder: RawDecoder,
+//! mut reader: R,
+//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//! futures::stream::poll_fn(move |cx| {
+//! loop {
+//! let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
+//! Ok(b) if b.is_empty() => break,
+//! Ok(b) => b,
+//! Err(e) => return Poll::Ready(Some(Err(e.into()))),
+//! };
+//! let read = b.len();
+//! let decoded = match decoder.decode(b) {
+//! Ok(decoded) => decoded,
+//! Err(e) => return Poll::Ready(Some(Err(e))),
+//! };
+//! Pin::new(&mut reader).consume(decoded);
+//! if decoded != read {
+//! break;
+//! }
+//! }
+//!
+//! Poll::Ready(decoder.flush().transpose())
+//! })
+//! }
+//! ```
+//!
use crate::raw::boolean_array::BooleanArrayDecoder;
use crate::raw::decimal_array::DecimalArrayDecoder;