You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/25 14:40:11 UTC
[arrow-rs] branch master updated: feat: set FlightDescriptor on FlightDataEncoderBuilder (#4101)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 005904976 feat: set FlightDescriptor on FlightDataEncoderBuilder (#4101)
005904976 is described below
commit 0059049768035486470c99ee2b9675f0983ced32
Author: Alex Huang <hu...@gmail.com>
AuthorDate: Tue Apr 25 16:40:04 2023 +0200
feat: set FlightDescriptor on FlightDataEncoderBuilder (#4101)
* feat: set FlightDescriptor on FlightDataEncoderBuilder
* send a separate descriptor message when the descriptor is provided
* include the flight descriptor in the first FlightData
---
arrow-flight/src/encode.rs | 26 ++++++++++++++++++++++++--
arrow-flight/tests/encode_decode.rs | 25 +++++++++++++++++++++++++
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index f97311d6f..9650031d8 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -17,7 +17,7 @@
use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
-use crate::{error::Result, FlightData, SchemaAsIpc};
+use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
@@ -72,6 +72,8 @@ pub struct FlightDataEncoderBuilder {
app_metadata: Bytes,
/// Optional schema, if known before data.
schema: Option<SchemaRef>,
+ /// Optional flight descriptor, if known before data.
+ descriptor: Option<FlightDescriptor>,
}
/// Default target size for encoded [`FlightData`].
@@ -87,6 +89,7 @@ impl Default for FlightDataEncoderBuilder {
options: IpcWriteOptions::default(),
app_metadata: Bytes::new(),
schema: None,
+ descriptor: None,
}
}
}
@@ -134,6 +137,15 @@ impl FlightDataEncoderBuilder {
self
}
+ /// Specify a flight descriptor in the first FlightData message.
+ pub fn with_flight_descriptor(
+ mut self,
+ descriptor: Option<FlightDescriptor>,
+ ) -> Self {
+ self.descriptor = descriptor;
+ self
+ }
+
/// Return a [`Stream`](futures::Stream) of [`FlightData`],
/// consuming self. More details on [`FlightDataEncoder`]
pub fn build<S>(self, input: S) -> FlightDataEncoder
@@ -145,6 +157,7 @@ impl FlightDataEncoderBuilder {
options,
app_metadata,
schema,
+ descriptor,
} = self;
FlightDataEncoder::new(
@@ -153,6 +166,7 @@ impl FlightDataEncoderBuilder {
max_flight_data_size,
options,
app_metadata,
+ descriptor,
)
}
}
@@ -176,6 +190,8 @@ pub struct FlightDataEncoder {
queue: VecDeque<FlightData>,
/// Is this stream done (inner is empty or errored)
done: bool,
+ /// cleared after the first FlightData message is sent
+ descriptor: Option<FlightDescriptor>,
}
impl FlightDataEncoder {
@@ -185,6 +201,7 @@ impl FlightDataEncoder {
max_flight_data_size: usize,
options: IpcWriteOptions,
app_metadata: Bytes,
+ descriptor: Option<FlightDescriptor>,
) -> Self {
let mut encoder = Self {
inner,
@@ -194,17 +211,22 @@ impl FlightDataEncoder {
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
done: false,
+ descriptor,
};
// If schema is known up front, enqueue it immediately
if let Some(schema) = schema {
encoder.encode_schema(&schema);
}
+
encoder
}
/// Place the `FlightData` in the queue to send
- fn queue_message(&mut self, data: FlightData) {
+ fn queue_message(&mut self, mut data: FlightData) {
+ if let Some(descriptor) = self.descriptor.take() {
+ data.flight_descriptor = Some(descriptor);
+ }
self.queue.push_back(data);
}
diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs
index 90fa2b7a6..4f1a8e667 100644
--- a/arrow-flight/tests/encode_decode.rs
+++ b/arrow-flight/tests/encode_decode.rs
@@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc};
use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, DictionaryArray, Float64Array, RecordBatch, UInt8Array};
use arrow_cast::pretty::pretty_format_batches;
+use arrow_flight::flight_descriptor::DescriptorType;
+use arrow_flight::FlightDescriptor;
use arrow_flight::{
decode::{DecodedPayload, FlightDataDecoder, FlightRecordBatchStream},
encode::FlightDataEncoderBuilder,
@@ -136,6 +138,29 @@ async fn test_zero_batches_schema_specified() {
assert_eq!(decoder.schema(), Some(&schema));
}
+#[tokio::test]
+async fn test_with_flight_descriptor() {
+ let stream = futures::stream::iter(vec![Ok(make_dictionary_batch(5))]);
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
+
+ let descriptor = Some(FlightDescriptor {
+ r#type: DescriptorType::Path.into(),
+ path: vec!["table_name".to_string()],
+ cmd: Bytes::default(),
+ });
+
+ let encoder = FlightDataEncoderBuilder::default()
+ .with_schema(schema.clone())
+ .with_flight_descriptor(descriptor.clone());
+
+ let mut encoder = encoder.build(stream);
+
+ // First batch should be the schema
+ let first_batch = encoder.next().await.unwrap().unwrap();
+
+ assert_eq!(first_batch.flight_descriptor, descriptor);
+}
+
#[tokio::test]
async fn test_zero_batches_dictionary_schema_specified() {
let schema = Arc::new(Schema::new(vec![