You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/05 11:24:46 UTC

[arrow-rs] branch master updated: Improve arrow flight batch splitting and naming (#3444)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e256e3de0 Improve arrow flight batch splitting and naming (#3444)
e256e3de0 is described below

commit e256e3de0c902a98dc15a13cfc86dfe4fb142dfa
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jan 5 06:24:41 2023 -0500

    Improve arrow flight batch splitting and naming (#3444)
    
    * Improve arrow flight batch splitting and naming
    
    * Review feedback: rename to max_flight_data_size
---
 arrow-flight/src/encode.rs          | 104 ++++++++++++++++++++++++++----------
 arrow-flight/tests/encode_decode.rs |   8 +--
 2 files changed, 80 insertions(+), 32 deletions(-)

diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index 7c339b67d..55000bba2 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -63,24 +63,25 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
 /// [`FlightError`]: crate::error::FlightError
 #[derive(Debug)]
 pub struct FlightDataEncoderBuilder {
-    /// The maximum message size (see details on [`Self::with_max_message_size`]).
-    max_batch_size: usize,
+    /// The maximum approximate target message size in bytes
+    /// (see details on [`Self::with_max_flight_data_size`]).
+    max_flight_data_size: usize,
     /// Ipc writer options
     options: IpcWriteOptions,
     /// Metadata to add to the schema message
     app_metadata: Bytes,
 }
 
-/// Default target size for record batches to send.
+/// Default target size for encoded [`FlightData`].
 ///
 /// Note this value would normally be 4MB, but the size calculation is
 /// somewhat inexact, so we set it to 2MB.
-pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152;
+pub const GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES: usize = 2097152;
 
 impl Default for FlightDataEncoderBuilder {
     fn default() -> Self {
         Self {
-            max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE,
+            max_flight_data_size: GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES,
             options: IpcWriteOptions::default(),
             app_metadata: Bytes::new(),
         }
@@ -92,16 +93,18 @@ impl FlightDataEncoderBuilder {
         Self::default()
     }
 
-    /// Set the (approximate) maximum encoded [`RecordBatch`] size to
-    /// limit the gRPC message size. Defaults to 2MB.
+    /// Set the (approximate) maximum size, in bytes, of the
+    /// [`FlightData`] produced by this encoder. Defaults to 2MB.
     ///
-    /// The encoder splits up [`RecordBatch`]s (preserving order) to
-    /// limit individual messages to approximately this size. The size
-    /// is approximate because there additional encoding overhead on
-    /// top of the underlying data itself.
+    /// Since there is often a maximum message size for gRPC messages
+    /// (typically around 4MB), this encoder splits up [`RecordBatch`]s
+    /// (preserving order) into multiple [`FlightData`] objects to
+    /// limit the size individual messages sent via gRPC.
     ///
-    pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self {
-        self.max_batch_size = max_batch_size;
+    /// The size is approximate because of the additional encoding
+    /// overhead on top of the underlying data buffers themselves.
+    pub fn with_max_flight_data_size(mut self, max_flight_data_size: usize) -> Self {
+        self.max_flight_data_size = max_flight_data_size;
         self
     }
 
@@ -126,12 +129,12 @@ impl FlightDataEncoderBuilder {
         S: Stream<Item = Result<RecordBatch>> + Send + 'static,
     {
         let Self {
-            max_batch_size,
+            max_flight_data_size,
             options,
             app_metadata,
         } = self;
 
-        FlightDataEncoder::new(input.boxed(), max_batch_size, options, app_metadata)
+        FlightDataEncoder::new(input.boxed(), max_flight_data_size, options, app_metadata)
     }
 }
 
@@ -143,29 +146,30 @@ pub struct FlightDataEncoder {
     inner: BoxStream<'static, Result<RecordBatch>>,
     /// schema, set after the first batch
     schema: Option<SchemaRef>,
-    /// Max size of batches to encode
-    max_batch_size: usize,
+    /// Target maximum size of flight data
+    /// (see details on [`FlightDataEncoderBuilder::with_max_flight_data_size`]).
+    max_flight_data_size: usize,
     /// do the encoding / tracking of dictionaries
     encoder: FlightIpcEncoder,
     /// optional metadata to add to schema FlightData
     app_metadata: Option<Bytes>,
     /// data queued up to send but not yet sent
     queue: VecDeque<FlightData>,
-    /// Is this strema done (inner is empty or errored)
+    /// Is this stream done (inner is empty or errored)
     done: bool,
 }
 
 impl FlightDataEncoder {
     fn new(
         inner: BoxStream<'static, Result<RecordBatch>>,
-        max_batch_size: usize,
+        max_flight_data_size: usize,
         options: IpcWriteOptions,
         app_metadata: Bytes,
     ) -> Self {
         Self {
             inner,
             schema: None,
-            max_batch_size,
+            max_flight_data_size,
             encoder: FlightIpcEncoder::new(options),
             app_metadata: Some(app_metadata),
             queue: VecDeque::new(),
@@ -210,7 +214,7 @@ impl FlightDataEncoder {
         // encode the batch
         let batch = prepare_batch_for_flight(&batch, schema)?;
 
-        for batch in split_batch_for_grpc_response(batch, self.max_batch_size) {
+        for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
             let (flight_dictionaries, flight_batch) =
                 self.encoder.encode_batch(&batch)?;
 
@@ -300,7 +304,7 @@ fn prepare_schema_for_flight(schema: &Schema) -> Schema {
 /// arrays: <https://github.com/apache/arrow-rs/issues/3407>
 fn split_batch_for_grpc_response(
     batch: RecordBatch,
-    max_batch_size: usize,
+    max_flight_data_size: usize,
 ) -> Vec<RecordBatch> {
     let size = batch
         .columns()
@@ -308,8 +312,9 @@ fn split_batch_for_grpc_response(
         .map(|col| col.get_buffer_memory_size())
         .sum::<usize>();
 
-    let n_batches =
-        (size / max_batch_size + usize::from(size % max_batch_size != 0)).max(1);
+    let n_batches = (size / max_flight_data_size
+        + usize::from(size % max_flight_data_size != 0))
+    .max(1);
     let rows_per_batch = (batch.num_rows() / n_batches).max(1);
     let mut out = Vec::with_capacity(n_batches + 1);
 
@@ -419,6 +424,7 @@ mod tests {
         array::{UInt32Array, UInt8Array},
         compute::concat_batches,
     };
+    use arrow_array::UInt64Array;
 
     use super::*;
 
@@ -480,24 +486,24 @@ mod tests {
 
     #[test]
     fn test_split_batch_for_grpc_response() {
-        let max_batch_size = 1024;
+        let max_flight_data_size = 1024;
 
         // no split
         let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
+        let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
         assert_eq!(split.len(), 1);
         assert_eq!(batch, split[0]);
 
         // split once
-        let n_rows = max_batch_size + 1;
+        let n_rows = max_flight_data_size + 1;
         assert!(n_rows % 2 == 1, "should be an odd number");
         let c =
             UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::<Vec<_>>());
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
+        let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
         assert_eq!(split.len(), 3);
         assert_eq!(
             split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
@@ -506,6 +512,48 @@ mod tests {
         assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch);
     }
 
+    #[test]
+    fn test_split_batch_for_grpc_response_sizes() {
+        // 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows
+        verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]);
+
+        // 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows
+        verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]);
+
+        // 2023 8 byte entries into 3k pieces does not divide evenly
+        verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]);
+
+        // 10 8 byte entries into 1 byte pieces means each rows gets its own
+        verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]);
+
+        // 10 8 byte entries into 1k byte pieces means one piece
+        verify_split(10, 1024, vec![10]);
+    }
+
+    /// Creates a UInt64Array of 8 byte integers with input_rows rows
+    /// `max_flight_data_size_bytes` pieces and verifies the row counts in
+    /// those pieces
+    fn verify_split(
+        num_input_rows: u64,
+        max_flight_data_size_bytes: usize,
+        expected_sizes: Vec<usize>,
+    ) {
+        let array: UInt64Array = (0..num_input_rows).collect();
+
+        let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)])
+            .expect("cannot create record batch");
+
+        let input_rows = batch.num_rows();
+
+        let split =
+            split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes);
+        let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect();
+        let output_rows: usize = sizes.iter().sum();
+
+        assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}");
+        assert_eq!(input_rows, output_rows, "mismatch for {batch:?}");
+    }
+
     // test sending record batches
     // test sending record batches with multiple different dictionaries
 }
diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs
index 45b8c0bf5..0aa987687 100644
--- a/arrow-flight/tests/encode_decode.rs
+++ b/arrow-flight/tests/encode_decode.rs
@@ -131,7 +131,7 @@ async fn test_max_message_size() {
     let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]);
 
     // 5 input rows, with a very small limit should result in 5 batch messages
-    let encoder = FlightDataEncoderBuilder::default().with_max_message_size(1);
+    let encoder = FlightDataEncoderBuilder::default().with_max_flight_data_size(1);
 
     let encode_stream = encoder.build(input_batch_stream);
 
@@ -164,9 +164,9 @@ async fn test_max_message_size_fuzz() {
         make_primative_batch(127),
     ];
 
-    for max_message_size in [10, 1024, 2048, 6400, 3211212] {
-        let encoder =
-            FlightDataEncoderBuilder::default().with_max_message_size(max_message_size);
+    for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] {
+        let encoder = FlightDataEncoderBuilder::default()
+            .with_max_flight_data_size(max_message_size_bytes);
 
         let input_batch_stream = futures::stream::iter(input.clone()).map(Ok);