You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/30 12:53:26 UTC

[arrow-rs] branch master updated: Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b409a184 Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)
1b409a184 is described below

commit 1b409a184f114c0b4a21f6f50081310d30ed0eaf
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue May 30 08:53:20 2023 -0400

    Add Builder style APIs and docs for `FlightData`,` FlightInfo`, `FlightEndpoint`, `Locaation` and `Ticket` (#4294)
    
    * Add Builder style APIs and docs for FlightData, FlightInfo, FlightEndpoint, Locaation and Ticket
    
    * fix and clippy
    
    * Rename `FlightInfo::with_schema` to `FlightInfo::try_with_schema`
---
 arrow-flight/examples/flight_sql_server.rs  |  62 ++------
 arrow-flight/src/lib.rs                     | 221 ++++++++++++++++++++++++----
 arrow-flight/tests/flight_sql_client_cli.rs |  64 ++++----
 3 files changed, 232 insertions(+), 115 deletions(-)

diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs
index 27ae5d854..783e0bf5b 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -29,7 +29,6 @@ use tonic::{Request, Response, Status, Streaming};
 use arrow_array::builder::StringBuilder;
 use arrow_array::{ArrayRef, RecordBatch};
 use arrow_flight::encode::FlightDataEncoderBuilder;
-use arrow_flight::flight_descriptor::DescriptorType;
 use arrow_flight::sql::sql_info::SqlInfoList;
 use arrow_flight::sql::{
     server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
@@ -222,26 +221,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
             ticket: Some(ticket),
             location: vec![loc],
         };
-        let endpoints = vec![endpoint];
+        let info = FlightInfo::new()
+            .try_with_schema(&schema)
+            .map_err(|e| status!("Unable to serialize schema", e))?
+            .with_descriptor(FlightDescriptor::new_cmd(vec![]))
+            .with_endpoint(endpoint)
+            .with_total_records(num_rows as i64)
+            .with_total_bytes(num_bytes as i64)
+            .with_ordered(false);
 
-        let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
-            .try_into()
-            .map_err(|e| status!("Unable to serialize schema", e))?;
-        let IpcMessage(schema_bytes) = message;
-
-        let flight_desc = FlightDescriptor {
-            r#type: DescriptorType::Cmd.into(),
-            cmd: Default::default(),
-            path: vec![],
-        };
-        let info = FlightInfo {
-            schema: schema_bytes,
-            flight_descriptor: Some(flight_desc),
-            endpoint: endpoints,
-            total_records: num_rows as i64,
-            total_bytes: num_bytes as i64,
-            ordered: false,
-        };
         let resp = Response::new(info);
         Ok(resp)
     }
@@ -292,32 +280,14 @@ impl FlightSqlService for FlightSqlServiceImpl {
         request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
         let flight_descriptor = request.into_inner();
-        let ticket = Ticket {
-            ticket: query.encode_to_vec().into(),
-        };
-
-        let options = IpcWriteOptions::default();
-
-        // encode the schema into the correct form
-        let IpcMessage(schema) = SchemaAsIpc::new(SqlInfoList::schema(), &options)
-            .try_into()
-            .expect("valid sql_info schema");
-
-        let endpoint = vec![FlightEndpoint {
-            ticket: Some(ticket),
-            // we assume users wnating to use this helper would reasonably
-            // never need to be distributed across multile endpoints?
-            location: vec![],
-        }];
-
-        let flight_info = FlightInfo {
-            schema,
-            flight_descriptor: Some(flight_descriptor),
-            endpoint,
-            total_records: -1,
-            total_bytes: -1,
-            ordered: false,
-        };
+        let ticket = Ticket::new(query.encode_to_vec());
+        let endpoint = FlightEndpoint::new().with_ticket(ticket);
+
+        let flight_info = FlightInfo::new()
+            .try_with_schema(SqlInfoList::schema())
+            .map_err(|e| status!("Unable to encode schema", e))?
+            .with_endpoint(endpoint)
+            .with_descriptor(flight_descriptor);
 
         Ok(tonic::Response::new(flight_info))
     }
diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs
index f7df32a20..4163f2cea 100644
--- a/arrow-flight/src/lib.rs
+++ b/arrow-flight/src/lib.rs
@@ -26,7 +26,8 @@
 //! This crate contains:
 //!
 //! 1. Low level [prost] generated structs
-//!  for Flight gRPC protobuf messages, such as [`FlightData`].
+//!  for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
+//!  [`Location`] and [`Ticket`].
 //!
 //! 2. Low level [tonic] generated [`flight_service_client`] and
 //! [`flight_service_server`].
@@ -390,21 +391,51 @@ impl FlightData {
     /// See [`FlightDataEncoderBuilder`] for a higher level API to
     /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
     ///
+    /// # Example:
+    ///
+    /// ```
+    /// # use bytes::Bytes;
+    /// # use arrow_flight::{FlightData, FlightDescriptor};
+    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
+    /// // Get encoded Arrow IPC data:
+    /// let data_body: Bytes = encode_data();
+    /// // Create the FlightData message
+    /// let flight_data = FlightData::new()
+    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
+    ///   .with_app_metadata("My apps metadata")
+    ///   .with_data_body(data_body);
+    /// ```
+    ///
     /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
     /// [`RecordBatch`]: arrow_array::RecordBatch
-    pub fn new(
-        flight_descriptor: Option<FlightDescriptor>,
-        message: IpcMessage,
-        app_metadata: impl Into<Bytes>,
-        data_body: impl Into<Bytes>,
-    ) -> Self {
-        let IpcMessage(vals) = message;
-        FlightData {
-            flight_descriptor,
-            data_header: vals,
-            app_metadata: app_metadata.into(),
-            data_body: data_body.into(),
-        }
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add a [`FlightDescriptor`] describing the data
+    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
+        self.flight_descriptor = Some(flight_descriptor);
+        self
+    }
+
+    /// Add a data header
+    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
+        self.data_header = data_header.into();
+        self
+    }
+
+    /// Add a data body. See [`IpcDataGenerator`] to create this data.
+    ///
+    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
+    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
+        self.data_body = data_body.into();
+        self
+    }
+
+    /// Add optional application specific metadata to the message
+    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
+        self.app_metadata = app_metadata.into();
+        self
     }
 }
 
@@ -433,24 +464,45 @@ impl FlightDescriptor {
 }
 
 impl FlightInfo {
-    /// Create a new [`FlightInfo`] that describes the access
-    /// coordinates for retrieval of a dataset.
-    pub fn new(
-        message: IpcMessage,
-        flight_descriptor: Option<FlightDescriptor>,
-        endpoint: Vec<FlightEndpoint>,
-        total_records: i64,
-        total_bytes: i64,
-        ordered: bool,
-    ) -> Self {
-        let IpcMessage(vals) = message;
+    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
+    ///
+    ///
+    /// # Example:
+    /// ```
+    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
+    /// # use arrow_schema::{Schema, Field, DataType};
+    /// # fn get_schema() -> Schema {
+    /// #   Schema::new(vec![
+    /// #     Field::new("a", DataType::Utf8, false),
+    /// #   ])
+    /// # }
+    /// #
+    /// // Create a new FlightInfo
+    /// let flight_info = FlightInfo::new()
+    ///   // Encode the Arrow schema
+    ///   .try_with_schema(&get_schema())
+    ///   .expect("encoding failed")
+    ///   .with_descriptor(
+    ///      FlightDescriptor::new_cmd("a command")
+    ///   )
+    ///   .with_endpoint(
+    ///      FlightEndpoint::new()
+    ///        .with_ticket(Ticket::new("ticket contents")
+    ///      )
+    ///    )
+    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
+    /// ```
+    pub fn new() -> FlightInfo {
         FlightInfo {
-            schema: vals,
-            flight_descriptor,
-            endpoint,
-            total_records,
-            total_bytes,
-            ordered,
+            schema: Bytes::new(),
+            flight_descriptor: None,
+            endpoint: vec![],
+            ordered: false,
+            // Flight says "Set these to -1 if unknown."
+            //
+            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
+            total_records: -1,
+            total_bytes: -1,
         }
     }
 
@@ -459,6 +511,51 @@ impl FlightInfo {
         let msg = IpcMessage(self.schema);
         msg.try_into()
     }
+
+    /// Specify the schema for the response.
+    ///
+    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
+    /// encodes it using the default IPC options.
+    ///
+    /// Returns an error if `schema` can not be encoded into IPC form.
+    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
+        let options = IpcWriteOptions::default();
+        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
+        self.schema = schema;
+        Ok(self)
+    }
+
+    /// Add specific a endpoint for fetching the data
+    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
+        self.endpoint.push(endpoint);
+        self
+    }
+
+    /// Add a [`FlightDescriptor`] describing what this data is
+    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
+        self.flight_descriptor = Some(flight_descriptor);
+        self
+    }
+
+    /// Set the number of records in the result, if known
+    pub fn with_total_records(mut self, total_records: i64) -> Self {
+        self.total_records = total_records;
+        self
+    }
+
+    /// Set the number of bytes in the result, if known
+    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
+        self.total_bytes = total_bytes;
+        self
+    }
+
+    /// Specify if the response is [ordered] across endpoints
+    ///
+    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
+    pub fn with_ordered(mut self, ordered: bool) -> Self {
+        self.ordered = ordered;
+        self
+    }
 }
 
 impl<'a> SchemaAsIpc<'a> {
@@ -486,6 +583,68 @@ impl Result {
     }
 }
 
+impl Ticket {
+    /// Create a new `Ticket`
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use arrow_flight::Ticket;
+    /// let ticket = Ticket::new("SELECT * from FOO");
+    /// ```
+    pub fn new(ticket: impl Into<Bytes>) -> Self {
+        Self {
+            ticket: ticket.into(),
+        }
+    }
+}
+
+impl FlightEndpoint {
+    /// Create a new, empty `FlightEndpoint` that represents a location
+    /// to retrieve Flight results.
+    ///
+    /// # Example
+    /// ```
+    /// # use arrow_flight::{FlightEndpoint, Ticket};
+    /// #
+    /// // Specify the client should fetch results from this server
+    /// let endpoint = FlightEndpoint::new()
+    ///   .with_ticket(Ticket::new("the ticket"));
+    ///
+    /// // Specify the client should fetch results from either
+    /// // `http://example.com` or `https://example.com`
+    /// let endpoint = FlightEndpoint::new()
+    ///   .with_ticket(Ticket::new("the ticket"))
+    ///   .with_location("http://example.com")
+    ///   .with_location("https://example.com");
+    /// ```
+    pub fn new() -> FlightEndpoint {
+        Default::default()
+    }
+
+    /// Set the [`Ticket`] used to retrieve data from the endpoint
+    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
+        self.ticket = Some(ticket);
+        self
+    }
+
+    /// Add a location `uri` to this endpoint. Note each endpoint can
+    /// have multiple locations.
+    ///
+    /// If no `uri` is specified, the [Flight Spec] says:
+    ///
+    /// ```text
+    /// * If the list is empty, the expectation is that the ticket can only
+    /// * be redeemed on the current service where the ticket was
+    /// * generated.
+    /// ```
+    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
+    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
+        self.location.push(Location { uri: uri.into() });
+        self
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/arrow-flight/tests/flight_sql_client_cli.rs b/arrow-flight/tests/flight_sql_client_cli.rs
index 9b3baca9b..c4ae9280c 100644
--- a/arrow-flight/tests/flight_sql_client_cli.rs
+++ b/arrow-flight/tests/flight_sql_client_cli.rs
@@ -36,9 +36,8 @@ use arrow_flight::{
     },
     utils::batches_to_flight_data,
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
-    HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket,
+    HandshakeResponse, Ticket,
 };
-use arrow_ipc::writer::IpcWriteOptions;
 use arrow_schema::{ArrowError, DataType, Field, Schema};
 use assert_cmd::Command;
 use futures::Stream;
@@ -167,42 +166,31 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
         let batch = Self::fake_result().unwrap();
 
-        let IpcMessage(schema_bytes) =
-            SchemaAsIpc::new(batch.schema().as_ref(), &IpcWriteOptions::default())
-                .try_into()
-                .unwrap();
-
-        let info = FlightInfo {
-            schema: schema_bytes,
-            flight_descriptor: None,
-            endpoint: vec![
-                FlightEndpoint {
-                    ticket: Some(Ticket {
-                        ticket: FetchResults {
-                            handle: String::from("part_1"),
-                        }
-                        .as_any()
-                        .encode_to_vec()
-                        .into(),
-                    }),
-                    location: vec![],
-                },
-                FlightEndpoint {
-                    ticket: Some(Ticket {
-                        ticket: FetchResults {
-                            handle: String::from("part_2"),
-                        }
-                        .as_any()
-                        .encode_to_vec()
-                        .into(),
-                    }),
-                    location: vec![],
-                },
-            ],
-            total_records: batch.num_rows() as i64,
-            total_bytes: batch.get_array_memory_size() as i64,
-            ordered: false,
-        };
+        let info = FlightInfo::new()
+            .try_with_schema(&batch.schema())
+            .expect("encoding schema")
+            .with_endpoint(
+                FlightEndpoint::new().with_ticket(Ticket::new(
+                    FetchResults {
+                        handle: String::from("part_1"),
+                    }
+                    .as_any()
+                    .encode_to_vec(),
+                )),
+            )
+            .with_endpoint(
+                FlightEndpoint::new().with_ticket(Ticket::new(
+                    FetchResults {
+                        handle: String::from("part_2"),
+                    }
+                    .as_any()
+                    .encode_to_vec(),
+                )),
+            )
+            .with_total_records(batch.num_rows() as i64)
+            .with_total_bytes(batch.get_array_memory_size() as i64)
+            .with_ordered(false);
+
         let resp = Response::new(info);
         Ok(resp)
     }