You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/08 13:51:17 UTC

[arrow-rs] branch master updated: feat(flight): harmonize server metadata APIs (#4384)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ad218fa99 feat(flight): harmonize server metadata APIs (#4384)
ad218fa99 is described below

commit ad218fa99f4f6f0bc8b58ee95ad9f389c5f23b21
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Thu Jun 8 15:51:10 2023 +0200

    feat(flight): harmonize server metadata APIs (#4384)
    
    * feat(flight): harmonize server metadata APIs
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * chore: fmt
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 arrow-flight/examples/flight_sql_server.rs |  26 +--
 arrow-flight/src/sql/metadata/mod.rs       |   2 +-
 arrow-flight/src/sql/metadata/sql_info.rs  | 249 +++++++++++++++++------------
 3 files changed, 165 insertions(+), 112 deletions(-)

diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs
index e9dba08f0..f717d9b62 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -31,7 +31,8 @@ use arrow_array::builder::StringBuilder;
 use arrow_array::{ArrayRef, RecordBatch};
 use arrow_flight::encode::FlightDataEncoderBuilder;
 use arrow_flight::sql::metadata::{
-    SqlInfoList, XdbcTypeInfo, XdbcTypeInfoData, XdbcTypeInfoDataBuilder,
+    SqlInfoData, SqlInfoDataBuilder, XdbcTypeInfo, XdbcTypeInfoData,
+    XdbcTypeInfoDataBuilder,
 };
 use arrow_flight::sql::{
     server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
@@ -66,13 +67,14 @@ const FAKE_TOKEN: &str = "uuid_token";
 const FAKE_HANDLE: &str = "uuid_handle";
 const FAKE_UPDATE_RESULT: i64 = 1;
 
-static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
-    SqlInfoList::new()
-        // Server information
-        .with_sql_info(SqlInfo::FlightSqlServerName, "Example Flight SQL Server")
-        .with_sql_info(SqlInfo::FlightSqlServerVersion, "1")
-        // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
-        .with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
+static INSTANCE_SQL_DATA: Lazy<SqlInfoData> = Lazy::new(|| {
+    let mut builder = SqlInfoDataBuilder::new();
+    // Server information
+    builder.append(SqlInfo::FlightSqlServerName, "Example Flight SQL Server");
+    builder.append(SqlInfo::FlightSqlServerVersion, "1");
+    // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
+    builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
+    builder.build().unwrap()
 });
 
 static INSTANCE_XBDC_DATA: Lazy<XdbcTypeInfoData> = Lazy::new(|| {
@@ -345,7 +347,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         let endpoint = FlightEndpoint::new().with_ticket(ticket);
 
         let flight_info = FlightInfo::new()
-            .try_with_schema(SqlInfoList::schema())
+            .try_with_schema(query.into_builder(&INSTANCE_SQL_DATA).schema().as_ref())
             .map_err(|e| status!("Unable to encode schema", e))?
             .with_endpoint(endpoint)
             .with_descriptor(flight_descriptor);
@@ -532,9 +534,11 @@ impl FlightSqlService for FlightSqlServiceImpl {
         query: CommandGetSqlInfo,
         _request: Request<Ticket>,
     ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        let batch = INSTANCE_SQL_INFO.filter(&query.info).encode();
+        let builder = query.into_builder(&INSTANCE_SQL_DATA);
+        let schema = builder.schema();
+        let batch = builder.build();
         let stream = FlightDataEncoderBuilder::new()
-            .with_schema(Arc::new(SqlInfoList::schema().clone()))
+            .with_schema(schema)
             .build(futures::stream::once(async { batch }))
             .map_err(Status::from);
         Ok(Response::new(Box::pin(stream)))
diff --git a/arrow-flight/src/sql/metadata/mod.rs b/arrow-flight/src/sql/metadata/mod.rs
index b823c1f4a..72c882f38 100644
--- a/arrow-flight/src/sql/metadata/mod.rs
+++ b/arrow-flight/src/sql/metadata/mod.rs
@@ -34,7 +34,7 @@ mod xdbc_info;
 
 pub use catalogs::GetCatalogsBuilder;
 pub use db_schemas::GetDbSchemasBuilder;
-pub use sql_info::SqlInfoList;
+pub use sql_info::{SqlInfoData, SqlInfoDataBuilder};
 pub use tables::GetTablesBuilder;
 pub use xdbc_info::{XdbcTypeInfo, XdbcTypeInfoData, XdbcTypeInfoDataBuilder};
 
diff --git a/arrow-flight/src/sql/metadata/sql_info.rs b/arrow-flight/src/sql/metadata/sql_info.rs
index 4b4604078..d0c9cedbc 100644
--- a/arrow-flight/src/sql/metadata/sql_info.rs
+++ b/arrow-flight/src/sql/metadata/sql_info.rs
@@ -15,26 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`SqlInfoList`] for building responses to [`CommandGetSqlInfo`] queries.
+//! Helpers for building responses to [`CommandGetSqlInfo`] metadata requests.
+//!
+//! - [`SqlInfoDataBuilder`] - a builder for collecting sql infos
+//!   and building a conformant `RecordBatch` with sql info server metadata.
+//! - [`SqlInfoData`] - a helper type wrapping a `RecordBatch`
+//!   used for storing sql info server metadata.
+//! - [`GetSqlInfoBuilder`] - a builder for consructing [`CommandGetSqlInfo`] responses.
 //!
-//! [`CommandGetSqlInfo`]: crate::sql::CommandGetSqlInfo
 
-use std::borrow::Cow;
 use std::collections::{BTreeMap, HashMap};
 use std::sync::Arc;
 
-use arrow_array::array::{Array, UnionArray};
+use arrow_arith::boolean::or;
+use arrow_array::array::{Array, UInt32Array, UnionArray};
 use arrow_array::builder::{
     ArrayBuilder, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
     MapBuilder, StringBuilder, UInt32Builder,
 };
+use arrow_array::cast::downcast_array;
 use arrow_array::RecordBatch;
 use arrow_data::ArrayData;
-use arrow_schema::{DataType, Field, Fields, Schema, UnionFields, UnionMode};
+use arrow_ord::comparison::eq_scalar;
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
+use arrow_select::filter::filter_record_batch;
 use once_cell::sync::Lazy;
 
 use crate::error::Result;
-use crate::sql::SqlInfo;
+use crate::sql::{CommandGetSqlInfo, SqlInfo};
 
 /// Represents a dynamic value
 #[derive(Debug, Clone, PartialEq)]
@@ -321,39 +329,15 @@ impl SqlInfoUnionBuilder {
     }
 }
 
-/// A builder for [`CommandGetSqlInfo`] response.
+/// Helper to create [`CommandGetSqlInfo`] responses.
 ///
 /// [`CommandGetSqlInfo`] are metadata requests used by a Flight SQL
-/// server to communicate supported capabilities to Flight SQL
-/// clients.
-///
-/// Servers construct a [`SqlInfoList`] by adding infos via
-/// [`with_sql_info`] and build the response using [`encode`].
-///
-/// The available configuration options are defined in the [Flight SQL protos][protos].
-///
-/// # Example
-/// ```
-/// # use arrow_flight::sql::{metadata::SqlInfoList, SqlInfo, SqlSupportedTransaction};
-/// // Create the list of metadata describing the server
-/// let info_list = SqlInfoList::new()
-///     .with_sql_info(SqlInfo::FlightSqlServerName, "server name")
-///     // ... add other SqlInfo here ..
-///     .with_sql_info(
-///         SqlInfo::FlightSqlServerTransaction,
-///         SqlSupportedTransaction::Transaction as i32,
-///     );
+/// server to communicate supported capabilities to Flight SQL clients.
 ///
-/// // Create the batch to send back to the client
-/// let batch = info_list.encode().unwrap();
-/// ```
-///
-/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
-/// [`CommandGetSqlInfo`]: crate::sql::CommandGetSqlInfo
-/// [`with_sql_info`]: SqlInfoList::with_sql_info
-/// [`encode`]: SqlInfoList::encode
+/// Servers constuct - usually static - [`SqlInfoData`] via the [SqlInfoDataBuilder`],
+/// and build responses by passing the [`GetSqlInfoBuilder`].
 #[derive(Debug, Clone, PartialEq)]
-pub struct SqlInfoList {
+pub struct SqlInfoDataBuilder {
     /// Use BTreeMap to ensure the values are sorted by value as
     /// to make output consistent
     ///
@@ -362,13 +346,13 @@ pub struct SqlInfoList {
     infos: BTreeMap<u32, SqlInfoValue>,
 }
 
-impl Default for SqlInfoList {
+impl Default for SqlInfoDataBuilder {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl SqlInfoList {
+impl SqlInfoDataBuilder {
     pub fn new() -> Self {
         Self {
             infos: BTreeMap::new(),
@@ -376,40 +360,23 @@ impl SqlInfoList {
     }
 
     /// register the specific sql metadata item
-    pub fn with_sql_info(
-        mut self,
-        name: impl SqlInfoName,
-        value: impl Into<SqlInfoValue>,
-    ) -> Self {
+    pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
         self.infos.insert(name.as_u32(), value.into());
-        self
-    }
-
-    /// Filter this info list keeping only the info values specified
-    /// in `infos`.
-    ///
-    /// Returns self if infos is empty (no filtering)
-    pub fn filter(&self, info: &[u32]) -> Cow<'_, Self> {
-        if info.is_empty() {
-            Cow::Borrowed(self)
-        } else {
-            let infos: BTreeMap<_, _> = info
-                .iter()
-                .filter_map(|name| self.infos.get(name).map(|v| (*name, v.clone())))
-                .collect();
-            Cow::Owned(Self { infos })
-        }
     }
 
     /// Encode the contents of this list according to the [FlightSQL spec]
     ///
     /// [FlightSQL spec]: (https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
-    pub fn encode(&self) -> Result<RecordBatch> {
+    pub fn build(self) -> Result<SqlInfoData> {
         let mut name_builder = UInt32Builder::new();
         let mut value_builder = SqlInfoUnionBuilder::new();
 
-        for (&name, value) in self.infos.iter() {
-            name_builder.append_value(name);
+        let mut names: Vec<_> = self.infos.keys().cloned().collect();
+        names.sort_unstable();
+
+        for key in names {
+            let (name, value) = self.infos.get_key_value(&key).unwrap();
+            name_builder.append_value(*name);
             value_builder.append_value(value)?
         }
 
@@ -417,7 +384,8 @@ impl SqlInfoList {
             ("info_name", Arc::new(name_builder.finish()) as _),
             ("value", Arc::new(value_builder.finish()) as _),
         ])?;
-        Ok(batch)
+
+        Ok(SqlInfoData { batch })
     }
 
     /// Return the [`Schema`] for a GetSchema RPC call with [`crate::sql::CommandGetSqlInfo`]
@@ -427,7 +395,89 @@ impl SqlInfoList {
     }
 }
 
-// The schema produced by [`SqlInfoList`]
+/// A builder for [`SqlInfoData`] which is used to create [`CommandGetSqlInfo`] responses.
+///
+/// # Example
+/// ```
+/// # use arrow_flight::sql::{metadata::SqlInfoDataBuilder, SqlInfo, SqlSupportedTransaction};
+/// // Create the list of metadata describing the server
+/// let mut builder = SqlInfoDataBuilder::new();
+/// builder.append(SqlInfo::FlightSqlServerName, "server name");
+///     // ... add other SqlInfo here ..
+/// builder.append(
+///     SqlInfo::FlightSqlServerTransaction,
+///     SqlSupportedTransaction::Transaction as i32,
+/// );
+///
+/// // Create the batch to send back to the client
+/// let info_data = builder.build().unwrap();
+/// ```
+///
+/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
+pub struct SqlInfoData {
+    batch: RecordBatch,
+}
+
+impl SqlInfoData {
+    /// Return a  [`RecordBatch`] containing only the requested `u32`, if any
+    /// from [`CommandGetSqlInfo`]
+    pub fn record_batch(
+        &self,
+        info: impl IntoIterator<Item = u32>,
+    ) -> Result<RecordBatch> {
+        let arr: UInt32Array = downcast_array(self.batch.column(0).as_ref());
+        let type_filter = info
+            .into_iter()
+            .map(|tt| eq_scalar(&arr, tt))
+            .collect::<std::result::Result<Vec<_>, _>>()?
+            .into_iter()
+            // We know the arrays are of same length as they are produced fromn the same root array
+            .reduce(|filter, arr| or(&filter, &arr).unwrap());
+        if let Some(filter) = type_filter {
+            Ok(filter_record_batch(&self.batch, &filter)?)
+        } else {
+            Ok(self.batch.clone())
+        }
+    }
+
+    /// Return the schema of the RecordBatch that will be returned
+    /// from [`CommandGetSqlInfo`]
+    pub fn schema(&self) -> SchemaRef {
+        self.batch.schema()
+    }
+}
+
+/// A builder for a [`CommandGetSqlInfo`] response.
+pub struct GetSqlInfoBuilder<'a> {
+    /// requested `SqlInfo`s. If empty means return all infos.
+    info: Vec<u32>,
+    infos: &'a SqlInfoData,
+}
+
+impl CommandGetSqlInfo {
+    /// Create a builder suitable for constructing a response
+    pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder {
+        GetSqlInfoBuilder {
+            info: self.info,
+            infos,
+        }
+    }
+}
+
+impl GetSqlInfoBuilder<'_> {
+    /// Builds a `RecordBatch` with the correct schema for a [`CommandGetSqlInfo`] response
+    pub fn build(self) -> Result<RecordBatch> {
+        self.infos.record_batch(self.info)
+    }
+
+    /// Return the schema of the RecordBatch that will be returned
+    /// from [`CommandGetSqlInfo`]
+    pub fn schema(&self) -> SchemaRef {
+        self.infos.schema()
+    }
+}
+
+// The schema produced by [`SqlInfoData`]
 static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
     Schema::new(vec![
         Field::new("info_name", DataType::UInt32, false),
@@ -439,7 +489,7 @@ static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
 mod tests {
     use std::collections::HashMap;
 
-    use super::SqlInfoList;
+    use super::SqlInfoDataBuilder;
     use crate::sql::metadata::tests::assert_batches_eq;
     use crate::sql::{
         SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert,
@@ -456,23 +506,23 @@ mod tests {
             ],
         );
 
-        let batch = SqlInfoList::new()
-            // str
-            .with_sql_info(SqlInfo::SqlIdentifierQuoteChar, r#"""#)
-            // bool
-            .with_sql_info(SqlInfo::SqlDdlCatalog, false)
-            // i32
-            .with_sql_info(
-                SqlInfo::SqlNullOrdering,
-                SqlNullOrdering::SqlNullsSortedHigh as i32,
-            )
-            // i64
-            .with_sql_info(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64)
-            // [str]
-            .with_sql_info(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str])
-            .with_sql_info(SqlInfo::SqlSupportsConvert, &convert)
-            .encode()
-            .unwrap();
+        let mut builder = SqlInfoDataBuilder::new();
+        // str
+        builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
+        // bool
+        builder.append(SqlInfo::SqlDdlCatalog, false);
+        // i32
+        builder.append(
+            SqlInfo::SqlNullOrdering,
+            SqlNullOrdering::SqlNullsSortedHigh as i32,
+        );
+        // i64
+        builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
+        // [str]
+        builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
+        builder.append(SqlInfo::SqlSupportsConvert, &convert);
+
+        let batch = builder.build().unwrap().record_batch(None).unwrap();
 
         let expected = vec![
             "+-----------+----------------------------------------+",
@@ -492,27 +542,26 @@ mod tests {
 
     #[test]
     fn test_filter_sql_infos() {
-        let info_list = SqlInfoList::new()
-            .with_sql_info(SqlInfo::FlightSqlServerName, "server name")
-            .with_sql_info(
-                SqlInfo::FlightSqlServerTransaction,
-                SqlSupportedTransaction::Transaction as i32,
-            );
-
-        let batch = info_list.encode().unwrap();
+        let mut builder = SqlInfoDataBuilder::new();
+        builder.append(SqlInfo::FlightSqlServerName, "server name");
+        builder.append(
+            SqlInfo::FlightSqlServerTransaction,
+            SqlSupportedTransaction::Transaction as i32,
+        );
+        let data = builder.build().unwrap();
+
+        let batch = data.record_batch(None).unwrap();
         assert_eq!(batch.num_rows(), 2);
 
-        let batch = info_list
-            .filter(&[SqlInfo::FlightSqlServerTransaction as u32])
-            .encode()
-            .unwrap();
-        let ref_batch = SqlInfoList::new()
-            .with_sql_info(
-                SqlInfo::FlightSqlServerTransaction,
-                SqlSupportedTransaction::Transaction as i32,
-            )
-            .encode()
+        let batch = data
+            .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
             .unwrap();
+        let mut ref_builder = SqlInfoDataBuilder::new();
+        ref_builder.append(
+            SqlInfo::FlightSqlServerTransaction,
+            SqlSupportedTransaction::Transaction as i32,
+        );
+        let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
 
         assert_eq!(batch, ref_batch);
     }