You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 11:24:46 UTC
[arrow-rs] branch master updated: Improve arrow flight batch splitting and naming (#3444)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e256e3de0 Improve arrow flight batch splitting and naming (#3444)
e256e3de0 is described below
commit e256e3de0c902a98dc15a13cfc86dfe4fb142dfa
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jan 5 06:24:41 2023 -0500
Improve arrow flight batch splitting and naming (#3444)
* Improve arrow flight batch splitting and naming
* Review feedback: rename to max_flight_data_size
---
arrow-flight/src/encode.rs | 104 ++++++++++++++++++++++++++----------
arrow-flight/tests/encode_decode.rs | 8 +--
2 files changed, 80 insertions(+), 32 deletions(-)
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index 7c339b67d..55000bba2 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -63,24 +63,25 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// [`FlightError`]: crate::error::FlightError
#[derive(Debug)]
pub struct FlightDataEncoderBuilder {
- /// The maximum message size (see details on [`Self::with_max_message_size`]).
- max_batch_size: usize,
+ /// The maximum approximate target message size in bytes
+ /// (see details on [`Self::with_max_flight_data_size`]).
+ max_flight_data_size: usize,
/// Ipc writer options
options: IpcWriteOptions,
/// Metadata to add to the schema message
app_metadata: Bytes,
}
-/// Default target size for record batches to send.
+/// Default target size for encoded [`FlightData`].
///
/// Note this value would normally be 4MB, but the size calculation is
/// somewhat inexact, so we set it to 2MB.
-pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152;
+pub const GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES: usize = 2097152;
impl Default for FlightDataEncoderBuilder {
fn default() -> Self {
Self {
- max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE,
+ max_flight_data_size: GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES,
options: IpcWriteOptions::default(),
app_metadata: Bytes::new(),
}
@@ -92,16 +93,18 @@ impl FlightDataEncoderBuilder {
Self::default()
}
- /// Set the (approximate) maximum encoded [`RecordBatch`] size to
- /// limit the gRPC message size. Defaults to 2MB.
+ /// Set the (approximate) maximum size, in bytes, of the
+ /// [`FlightData`] produced by this encoder. Defaults to 2MB.
///
- /// The encoder splits up [`RecordBatch`]s (preserving order) to
- /// limit individual messages to approximately this size. The size
- /// is approximate because there additional encoding overhead on
- /// top of the underlying data itself.
+ /// Since there is often a maximum message size for gRPC messages
+ /// (typically around 4MB), this encoder splits up [`RecordBatch`]s
+ /// (preserving order) into multiple [`FlightData`] objects to
+ /// limit the size individual messages sent via gRPC.
///
- pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self {
- self.max_batch_size = max_batch_size;
+ /// The size is approximate because of the additional encoding
+ /// overhead on top of the underlying data buffers themselves.
+ pub fn with_max_flight_data_size(mut self, max_flight_data_size: usize) -> Self {
+ self.max_flight_data_size = max_flight_data_size;
self
}
@@ -126,12 +129,12 @@ impl FlightDataEncoderBuilder {
S: Stream<Item = Result<RecordBatch>> + Send + 'static,
{
let Self {
- max_batch_size,
+ max_flight_data_size,
options,
app_metadata,
} = self;
- FlightDataEncoder::new(input.boxed(), max_batch_size, options, app_metadata)
+ FlightDataEncoder::new(input.boxed(), max_flight_data_size, options, app_metadata)
}
}
@@ -143,29 +146,30 @@ pub struct FlightDataEncoder {
inner: BoxStream<'static, Result<RecordBatch>>,
/// schema, set after the first batch
schema: Option<SchemaRef>,
- /// Max size of batches to encode
- max_batch_size: usize,
+ /// Target maximum size of flight data
+ /// (see details on [`FlightDataEncoderBuilder::with_max_flight_data_size`]).
+ max_flight_data_size: usize,
/// do the encoding / tracking of dictionaries
encoder: FlightIpcEncoder,
/// optional metadata to add to schema FlightData
app_metadata: Option<Bytes>,
/// data queued up to send but not yet sent
queue: VecDeque<FlightData>,
- /// Is this strema done (inner is empty or errored)
+ /// Is this stream done (inner is empty or errored)
done: bool,
}
impl FlightDataEncoder {
fn new(
inner: BoxStream<'static, Result<RecordBatch>>,
- max_batch_size: usize,
+ max_flight_data_size: usize,
options: IpcWriteOptions,
app_metadata: Bytes,
) -> Self {
Self {
inner,
schema: None,
- max_batch_size,
+ max_flight_data_size,
encoder: FlightIpcEncoder::new(options),
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
@@ -210,7 +214,7 @@ impl FlightDataEncoder {
// encode the batch
let batch = prepare_batch_for_flight(&batch, schema)?;
- for batch in split_batch_for_grpc_response(batch, self.max_batch_size) {
+ for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
self.encoder.encode_batch(&batch)?;
@@ -300,7 +304,7 @@ fn prepare_schema_for_flight(schema: &Schema) -> Schema {
/// arrays: <https://github.com/apache/arrow-rs/issues/3407>
fn split_batch_for_grpc_response(
batch: RecordBatch,
- max_batch_size: usize,
+ max_flight_data_size: usize,
) -> Vec<RecordBatch> {
let size = batch
.columns()
@@ -308,8 +312,9 @@ fn split_batch_for_grpc_response(
.map(|col| col.get_buffer_memory_size())
.sum::<usize>();
- let n_batches =
- (size / max_batch_size + usize::from(size % max_batch_size != 0)).max(1);
+ let n_batches = (size / max_flight_data_size
+ + usize::from(size % max_flight_data_size != 0))
+ .max(1);
let rows_per_batch = (batch.num_rows() / n_batches).max(1);
let mut out = Vec::with_capacity(n_batches + 1);
@@ -419,6 +424,7 @@ mod tests {
array::{UInt32Array, UInt8Array},
compute::concat_batches,
};
+ use arrow_array::UInt64Array;
use super::*;
@@ -480,24 +486,24 @@ mod tests {
#[test]
fn test_split_batch_for_grpc_response() {
- let max_batch_size = 1024;
+ let max_flight_data_size = 1024;
// no split
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
+ let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
assert_eq!(split.len(), 1);
assert_eq!(batch, split[0]);
// split once
- let n_rows = max_batch_size + 1;
+ let n_rows = max_flight_data_size + 1;
assert!(n_rows % 2 == 1, "should be an odd number");
let c =
UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::<Vec<_>>());
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
+ let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
assert_eq!(split.len(), 3);
assert_eq!(
split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
@@ -506,6 +512,48 @@ mod tests {
assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch);
}
+ #[test]
+ fn test_split_batch_for_grpc_response_sizes() {
+ // 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows
+ verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]);
+
+ // 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows
+ verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]);
+
+ // 2023 8 byte entries into 3k pieces does not divide evenly
+ verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]);
+
+ // 10 8 byte entries into 1 byte pieces means each rows gets its own
+ verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]);
+
+ // 10 8 byte entries into 1k byte pieces means one piece
+ verify_split(10, 1024, vec![10]);
+ }
+
+ /// Creates a UInt64Array of 8 byte integers with input_rows rows
+ /// `max_flight_data_size_bytes` pieces and verifies the row counts in
+ /// those pieces
+ fn verify_split(
+ num_input_rows: u64,
+ max_flight_data_size_bytes: usize,
+ expected_sizes: Vec<usize>,
+ ) {
+ let array: UInt64Array = (0..num_input_rows).collect();
+
+ let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)])
+ .expect("cannot create record batch");
+
+ let input_rows = batch.num_rows();
+
+ let split =
+ split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes);
+ let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect();
+ let output_rows: usize = sizes.iter().sum();
+
+ assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}");
+ assert_eq!(input_rows, output_rows, "mismatch for {batch:?}");
+ }
+
// test sending record batches
// test sending record batches with multiple different dictionaries
}
diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs
index 45b8c0bf5..0aa987687 100644
--- a/arrow-flight/tests/encode_decode.rs
+++ b/arrow-flight/tests/encode_decode.rs
@@ -131,7 +131,7 @@ async fn test_max_message_size() {
let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]);
// 5 input rows, with a very small limit should result in 5 batch messages
- let encoder = FlightDataEncoderBuilder::default().with_max_message_size(1);
+ let encoder = FlightDataEncoderBuilder::default().with_max_flight_data_size(1);
let encode_stream = encoder.build(input_batch_stream);
@@ -164,9 +164,9 @@ async fn test_max_message_size_fuzz() {
make_primative_batch(127),
];
- for max_message_size in [10, 1024, 2048, 6400, 3211212] {
- let encoder =
- FlightDataEncoderBuilder::default().with_max_message_size(max_message_size);
+ for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] {
+ let encoder = FlightDataEncoderBuilder::default()
+ .with_max_flight_data_size(max_message_size_bytes);
let input_batch_stream = futures::stream::iter(input.clone()).map(Ok);