You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/30 12:53:26 UTC
[arrow-rs] branch master updated: Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 1b409a184 Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)
1b409a184 is described below
commit 1b409a184f114c0b4a21f6f50081310d30ed0eaf
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue May 30 08:53:20 2023 -0400
Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)
* Add Builder style APIs and docs for FlightData, FlightInfo, FlightEndpoint, Locaation and Ticket
* fix and clippy
* Rename `FlightInfo::with_schema` to `FlightInfo::try_with_schema`
---
arrow-flight/examples/flight_sql_server.rs | 62 ++------
arrow-flight/src/lib.rs | 221 ++++++++++++++++++++++++----
arrow-flight/tests/flight_sql_client_cli.rs | 64 ++++----
3 files changed, 232 insertions(+), 115 deletions(-)
diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs
index 27ae5d854..783e0bf5b 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -29,7 +29,6 @@ use tonic::{Request, Response, Status, Streaming};
use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
-use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
@@ -222,26 +221,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
ticket: Some(ticket),
location: vec![loc],
};
- let endpoints = vec![endpoint];
+ let info = FlightInfo::new()
+ .try_with_schema(&schema)
+ .map_err(|e| status!("Unable to serialize schema", e))?
+ .with_descriptor(FlightDescriptor::new_cmd(vec![]))
+ .with_endpoint(endpoint)
+ .with_total_records(num_rows as i64)
+ .with_total_bytes(num_bytes as i64)
+ .with_ordered(false);
- let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
- .try_into()
- .map_err(|e| status!("Unable to serialize schema", e))?;
- let IpcMessage(schema_bytes) = message;
-
- let flight_desc = FlightDescriptor {
- r#type: DescriptorType::Cmd.into(),
- cmd: Default::default(),
- path: vec![],
- };
- let info = FlightInfo {
- schema: schema_bytes,
- flight_descriptor: Some(flight_desc),
- endpoint: endpoints,
- total_records: num_rows as i64,
- total_bytes: num_bytes as i64,
- ordered: false,
- };
let resp = Response::new(info);
Ok(resp)
}
@@ -292,32 +280,14 @@ impl FlightSqlService for FlightSqlServiceImpl {
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let flight_descriptor = request.into_inner();
- let ticket = Ticket {
- ticket: query.encode_to_vec().into(),
- };
-
- let options = IpcWriteOptions::default();
-
- // encode the schema into the correct form
- let IpcMessage(schema) = SchemaAsIpc::new(SqlInfoList::schema(), &options)
- .try_into()
- .expect("valid sql_info schema");
-
- let endpoint = vec![FlightEndpoint {
- ticket: Some(ticket),
- // we assume users wnating to use this helper would reasonably
- // never need to be distributed across multile endpoints?
- location: vec![],
- }];
-
- let flight_info = FlightInfo {
- schema,
- flight_descriptor: Some(flight_descriptor),
- endpoint,
- total_records: -1,
- total_bytes: -1,
- ordered: false,
- };
+ let ticket = Ticket::new(query.encode_to_vec());
+ let endpoint = FlightEndpoint::new().with_ticket(ticket);
+
+ let flight_info = FlightInfo::new()
+ .try_with_schema(SqlInfoList::schema())
+ .map_err(|e| status!("Unable to encode schema", e))?
+ .with_endpoint(endpoint)
+ .with_descriptor(flight_descriptor);
Ok(tonic::Response::new(flight_info))
}
diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs
index f7df32a20..4163f2cea 100644
--- a/arrow-flight/src/lib.rs
+++ b/arrow-flight/src/lib.rs
@@ -26,7 +26,8 @@
//! This crate contains:
//!
//! 1. Low level [prost] generated structs
-//! for Flight gRPC protobuf messages, such as [`FlightData`].
+//! for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
+//! [`Location`] and [`Ticket`].
//!
//! 2. Low level [tonic] generated [`flight_service_client`] and
//! [`flight_service_server`].
@@ -390,21 +391,51 @@ impl FlightData {
/// See [`FlightDataEncoderBuilder`] for a higher level API to
/// convert a stream of [`RecordBatch`]es to [`FlightData`]s
///
+ /// # Example:
+ ///
+ /// ```
+ /// # use bytes::Bytes;
+ /// # use arrow_flight::{FlightData, FlightDescriptor};
+ /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
+ /// // Get encoded Arrow IPC data:
+ /// let data_body: Bytes = encode_data();
+ /// // Create the FlightData message
+ /// let flight_data = FlightData::new()
+ /// .with_descriptor(FlightDescriptor::new_cmd("the command"))
+ /// .with_app_metadata("My apps metadata")
+ /// .with_data_body(data_body);
+ /// ```
+ ///
/// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
/// [`RecordBatch`]: arrow_array::RecordBatch
- pub fn new(
- flight_descriptor: Option<FlightDescriptor>,
- message: IpcMessage,
- app_metadata: impl Into<Bytes>,
- data_body: impl Into<Bytes>,
- ) -> Self {
- let IpcMessage(vals) = message;
- FlightData {
- flight_descriptor,
- data_header: vals,
- app_metadata: app_metadata.into(),
- data_body: data_body.into(),
- }
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Add a [`FlightDescriptor`] describing the data
+ pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
+ self.flight_descriptor = Some(flight_descriptor);
+ self
+ }
+
+ /// Add a data header
+ pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
+ self.data_header = data_header.into();
+ self
+ }
+
+ /// Add a data body. See [`IpcDataGenerator`] to create this data.
+ ///
+ /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
+ pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
+ self.data_body = data_body.into();
+ self
+ }
+
+ /// Add optional application specific metadata to the message
+ pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
+ self.app_metadata = app_metadata.into();
+ self
}
}
@@ -433,24 +464,45 @@ impl FlightDescriptor {
}
impl FlightInfo {
- /// Create a new [`FlightInfo`] that describes the access
- /// coordinates for retrieval of a dataset.
- pub fn new(
- message: IpcMessage,
- flight_descriptor: Option<FlightDescriptor>,
- endpoint: Vec<FlightEndpoint>,
- total_records: i64,
- total_bytes: i64,
- ordered: bool,
- ) -> Self {
- let IpcMessage(vals) = message;
+ /// Create a new, empty `FlightInfo`, describing where to fetch flight data
+ ///
+ ///
+ /// # Example:
+ /// ```
+ /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
+ /// # use arrow_schema::{Schema, Field, DataType};
+ /// # fn get_schema() -> Schema {
+ /// # Schema::new(vec![
+ /// # Field::new("a", DataType::Utf8, false),
+ /// # ])
+ /// # }
+ /// #
+ /// // Create a new FlightInfo
+ /// let flight_info = FlightInfo::new()
+ /// // Encode the Arrow schema
+ /// .try_with_schema(&get_schema())
+ /// .expect("encoding failed")
+ /// .with_descriptor(
+ /// FlightDescriptor::new_cmd("a command")
+ /// )
+ /// .with_endpoint(
+ /// FlightEndpoint::new()
+ /// .with_ticket(Ticket::new("ticket contents")
+ /// )
+ /// )
+ /// .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
+ /// ```
+ pub fn new() -> FlightInfo {
FlightInfo {
- schema: vals,
- flight_descriptor,
- endpoint,
- total_records,
- total_bytes,
- ordered,
+ schema: Bytes::new(),
+ flight_descriptor: None,
+ endpoint: vec![],
+ ordered: false,
+ // Flight says "Set these to -1 if unknown."
+ //
+ // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
+ total_records: -1,
+ total_bytes: -1,
}
}
@@ -459,6 +511,51 @@ impl FlightInfo {
let msg = IpcMessage(self.schema);
msg.try_into()
}
+
+ /// Specify the schema for the response.
+ ///
+ /// Note this takes the arrow [`Schema`] (not the IPC schema) and
+ /// encodes it using the default IPC options.
+ ///
+ /// Returns an error if `schema` can not be encoded into IPC form.
+ pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
+ let options = IpcWriteOptions::default();
+ let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
+ self.schema = schema;
+ Ok(self)
+ }
+
+ /// Add specific a endpoint for fetching the data
+ pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
+ self.endpoint.push(endpoint);
+ self
+ }
+
+ /// Add a [`FlightDescriptor`] describing what this data is
+ pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
+ self.flight_descriptor = Some(flight_descriptor);
+ self
+ }
+
+ /// Set the number of records in the result, if known
+ pub fn with_total_records(mut self, total_records: i64) -> Self {
+ self.total_records = total_records;
+ self
+ }
+
+ /// Set the number of bytes in the result, if known
+ pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
+ self.total_bytes = total_bytes;
+ self
+ }
+
+ /// Specify if the response is [ordered] across endpoints
+ ///
+ /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
+ pub fn with_ordered(mut self, ordered: bool) -> Self {
+ self.ordered = ordered;
+ self
+ }
}
impl<'a> SchemaAsIpc<'a> {
@@ -486,6 +583,68 @@ impl Result {
}
}
+impl Ticket {
+ /// Create a new `Ticket`
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use arrow_flight::Ticket;
+ /// let ticket = Ticket::new("SELECT * from FOO");
+ /// ```
+ pub fn new(ticket: impl Into<Bytes>) -> Self {
+ Self {
+ ticket: ticket.into(),
+ }
+ }
+}
+
+impl FlightEndpoint {
+ /// Create a new, empty `FlightEndpoint` that represents a location
+ /// to retrieve Flight results.
+ ///
+ /// # Example
+ /// ```
+ /// # use arrow_flight::{FlightEndpoint, Ticket};
+ /// #
+ /// // Specify the client should fetch results from this server
+ /// let endpoint = FlightEndpoint::new()
+ /// .with_ticket(Ticket::new("the ticket"));
+ ///
+ /// // Specify the client should fetch results from either
+ /// // `http://example.com` or `https://example.com`
+ /// let endpoint = FlightEndpoint::new()
+ /// .with_ticket(Ticket::new("the ticket"))
+ /// .with_location("http://example.com")
+ /// .with_location("https://example.com");
+ /// ```
+ pub fn new() -> FlightEndpoint {
+ Default::default()
+ }
+
+ /// Set the [`Ticket`] used to retrieve data from the endpoint
+ pub fn with_ticket(mut self, ticket: Ticket) -> Self {
+ self.ticket = Some(ticket);
+ self
+ }
+
+ /// Add a location `uri` to this endpoint. Note each endpoint can
+ /// have multiple locations.
+ ///
+ /// If no `uri` is specified, the [Flight Spec] says:
+ ///
+ /// ```text
+ /// * If the list is empty, the expectation is that the ticket can only
+ /// * be redeemed on the current service where the ticket was
+ /// * generated.
+ /// ```
+ /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
+ pub fn with_location(mut self, uri: impl Into<String>) -> Self {
+ self.location.push(Location { uri: uri.into() });
+ self
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/arrow-flight/tests/flight_sql_client_cli.rs b/arrow-flight/tests/flight_sql_client_cli.rs
index 9b3baca9b..c4ae9280c 100644
--- a/arrow-flight/tests/flight_sql_client_cli.rs
+++ b/arrow-flight/tests/flight_sql_client_cli.rs
@@ -36,9 +36,8 @@ use arrow_flight::{
},
utils::batches_to_flight_data,
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
- HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket,
+ HandshakeResponse, Ticket,
};
-use arrow_ipc::writer::IpcWriteOptions;
use arrow_schema::{ArrowError, DataType, Field, Schema};
use assert_cmd::Command;
use futures::Stream;
@@ -167,42 +166,31 @@ impl FlightSqlService for FlightSqlServiceImpl {
let batch = Self::fake_result().unwrap();
- let IpcMessage(schema_bytes) =
- SchemaAsIpc::new(batch.schema().as_ref(), &IpcWriteOptions::default())
- .try_into()
- .unwrap();
-
- let info = FlightInfo {
- schema: schema_bytes,
- flight_descriptor: None,
- endpoint: vec![
- FlightEndpoint {
- ticket: Some(Ticket {
- ticket: FetchResults {
- handle: String::from("part_1"),
- }
- .as_any()
- .encode_to_vec()
- .into(),
- }),
- location: vec![],
- },
- FlightEndpoint {
- ticket: Some(Ticket {
- ticket: FetchResults {
- handle: String::from("part_2"),
- }
- .as_any()
- .encode_to_vec()
- .into(),
- }),
- location: vec![],
- },
- ],
- total_records: batch.num_rows() as i64,
- total_bytes: batch.get_array_memory_size() as i64,
- ordered: false,
- };
+ let info = FlightInfo::new()
+ .try_with_schema(&batch.schema())
+ .expect("encoding schema")
+ .with_endpoint(
+ FlightEndpoint::new().with_ticket(Ticket::new(
+ FetchResults {
+ handle: String::from("part_1"),
+ }
+ .as_any()
+ .encode_to_vec(),
+ )),
+ )
+ .with_endpoint(
+ FlightEndpoint::new().with_ticket(Ticket::new(
+ FetchResults {
+ handle: String::from("part_2"),
+ }
+ .as_any()
+ .encode_to_vec(),
+ )),
+ )
+ .with_total_records(batch.num_rows() as i64)
+ .with_total_bytes(batch.get_array_memory_size() as i64)
+ .with_ordered(false);
+
let resp = Response::new(info);
Ok(resp)
}