You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/08 13:51:17 UTC
[arrow-rs] branch master updated: feat(flight): harmonize server metadata APIs (#4384)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ad218fa99 feat(flight): harmonize server metadata APIs (#4384)
ad218fa99 is described below
commit ad218fa99f4f6f0bc8b58ee95ad9f389c5f23b21
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Thu Jun 8 15:51:10 2023 +0200
feat(flight): harmonize server metadata APIs (#4384)
* feat(flight): harmonize server metadata APIs
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* chore: fmt
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
arrow-flight/examples/flight_sql_server.rs | 26 +--
arrow-flight/src/sql/metadata/mod.rs | 2 +-
arrow-flight/src/sql/metadata/sql_info.rs | 249 +++++++++++++++++------------
3 files changed, 165 insertions(+), 112 deletions(-)
diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs
index e9dba08f0..f717d9b62 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -31,7 +31,8 @@ use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::sql::metadata::{
- SqlInfoList, XdbcTypeInfo, XdbcTypeInfoData, XdbcTypeInfoDataBuilder,
+ SqlInfoData, SqlInfoDataBuilder, XdbcTypeInfo, XdbcTypeInfoData,
+ XdbcTypeInfoDataBuilder,
};
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
@@ -66,13 +67,14 @@ const FAKE_TOKEN: &str = "uuid_token";
const FAKE_HANDLE: &str = "uuid_handle";
const FAKE_UPDATE_RESULT: i64 = 1;
-static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
- SqlInfoList::new()
- // Server information
- .with_sql_info(SqlInfo::FlightSqlServerName, "Example Flight SQL Server")
- .with_sql_info(SqlInfo::FlightSqlServerVersion, "1")
- // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
- .with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
+static INSTANCE_SQL_DATA: Lazy<SqlInfoData> = Lazy::new(|| {
+ let mut builder = SqlInfoDataBuilder::new();
+ // Server information
+ builder.append(SqlInfo::FlightSqlServerName, "Example Flight SQL Server");
+ builder.append(SqlInfo::FlightSqlServerVersion, "1");
+ // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
+ builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
+ builder.build().unwrap()
});
static INSTANCE_XBDC_DATA: Lazy<XdbcTypeInfoData> = Lazy::new(|| {
@@ -345,7 +347,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let endpoint = FlightEndpoint::new().with_ticket(ticket);
let flight_info = FlightInfo::new()
- .try_with_schema(SqlInfoList::schema())
+ .try_with_schema(query.into_builder(&INSTANCE_SQL_DATA).schema().as_ref())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);
@@ -532,9 +534,11 @@ impl FlightSqlService for FlightSqlServiceImpl {
query: CommandGetSqlInfo,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- let batch = INSTANCE_SQL_INFO.filter(&query.info).encode();
+ let builder = query.into_builder(&INSTANCE_SQL_DATA);
+ let schema = builder.schema();
+ let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
- .with_schema(Arc::new(SqlInfoList::schema().clone()))
+ .with_schema(schema)
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
diff --git a/arrow-flight/src/sql/metadata/mod.rs b/arrow-flight/src/sql/metadata/mod.rs
index b823c1f4a..72c882f38 100644
--- a/arrow-flight/src/sql/metadata/mod.rs
+++ b/arrow-flight/src/sql/metadata/mod.rs
@@ -34,7 +34,7 @@ mod xdbc_info;
pub use catalogs::GetCatalogsBuilder;
pub use db_schemas::GetDbSchemasBuilder;
-pub use sql_info::SqlInfoList;
+pub use sql_info::{SqlInfoData, SqlInfoDataBuilder};
pub use tables::GetTablesBuilder;
pub use xdbc_info::{XdbcTypeInfo, XdbcTypeInfoData, XdbcTypeInfoDataBuilder};
diff --git a/arrow-flight/src/sql/metadata/sql_info.rs b/arrow-flight/src/sql/metadata/sql_info.rs
index 4b4604078..d0c9cedbc 100644
--- a/arrow-flight/src/sql/metadata/sql_info.rs
+++ b/arrow-flight/src/sql/metadata/sql_info.rs
@@ -15,26 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-//! [`SqlInfoList`] for building responses to [`CommandGetSqlInfo`] queries.
+//! Helpers for building responses to [`CommandGetSqlInfo`] metadata requests.
+//!
+//! - [`SqlInfoDataBuilder`] - a builder for collecting sql infos
+//! and building a conformant `RecordBatch` with sql info server metadata.
+//! - [`SqlInfoData`] - a helper type wrapping a `RecordBatch`
+//! used for storing sql info server metadata.
+//! - [`GetSqlInfoBuilder`] - a builder for consructing [`CommandGetSqlInfo`] responses.
//!
-//! [`CommandGetSqlInfo`]: crate::sql::CommandGetSqlInfo
-use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
-use arrow_array::array::{Array, UnionArray};
+use arrow_arith::boolean::or;
+use arrow_array::array::{Array, UInt32Array, UnionArray};
use arrow_array::builder::{
ArrayBuilder, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
MapBuilder, StringBuilder, UInt32Builder,
};
+use arrow_array::cast::downcast_array;
use arrow_array::RecordBatch;
use arrow_data::ArrayData;
-use arrow_schema::{DataType, Field, Fields, Schema, UnionFields, UnionMode};
+use arrow_ord::comparison::eq_scalar;
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
+use arrow_select::filter::filter_record_batch;
use once_cell::sync::Lazy;
use crate::error::Result;
-use crate::sql::SqlInfo;
+use crate::sql::{CommandGetSqlInfo, SqlInfo};
/// Represents a dynamic value
#[derive(Debug, Clone, PartialEq)]
@@ -321,39 +329,15 @@ impl SqlInfoUnionBuilder {
}
}
-/// A builder for [`CommandGetSqlInfo`] response.
+/// Helper to create [`CommandGetSqlInfo`] responses.
///
/// [`CommandGetSqlInfo`] are metadata requests used by a Flight SQL
-/// server to communicate supported capabilities to Flight SQL
-/// clients.
-///
-/// Servers construct a [`SqlInfoList`] by adding infos via
-/// [`with_sql_info`] and build the response using [`encode`].
-///
-/// The available configuration options are defined in the [Flight SQL protos][protos].
-///
-/// # Example
-/// ```
-/// # use arrow_flight::sql::{metadata::SqlInfoList, SqlInfo, SqlSupportedTransaction};
-/// // Create the list of metadata describing the server
-/// let info_list = SqlInfoList::new()
-/// .with_sql_info(SqlInfo::FlightSqlServerName, "server name")
-/// // ... add other SqlInfo here ..
-/// .with_sql_info(
-/// SqlInfo::FlightSqlServerTransaction,
-/// SqlSupportedTransaction::Transaction as i32,
-/// );
+/// server to communicate supported capabilities to Flight SQL clients.
///
-/// // Create the batch to send back to the client
-/// let batch = info_list.encode().unwrap();
-/// ```
-///
-/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
-/// [`CommandGetSqlInfo`]: crate::sql::CommandGetSqlInfo
-/// [`with_sql_info`]: SqlInfoList::with_sql_info
-/// [`encode`]: SqlInfoList::encode
+/// Servers constuct - usually static - [`SqlInfoData`] via the [SqlInfoDataBuilder`],
+/// and build responses by passing the [`GetSqlInfoBuilder`].
#[derive(Debug, Clone, PartialEq)]
-pub struct SqlInfoList {
+pub struct SqlInfoDataBuilder {
/// Use BTreeMap to ensure the values are sorted by value as
/// to make output consistent
///
@@ -362,13 +346,13 @@ pub struct SqlInfoList {
infos: BTreeMap<u32, SqlInfoValue>,
}
-impl Default for SqlInfoList {
+impl Default for SqlInfoDataBuilder {
fn default() -> Self {
Self::new()
}
}
-impl SqlInfoList {
+impl SqlInfoDataBuilder {
pub fn new() -> Self {
Self {
infos: BTreeMap::new(),
@@ -376,40 +360,23 @@ impl SqlInfoList {
}
/// register the specific sql metadata item
- pub fn with_sql_info(
- mut self,
- name: impl SqlInfoName,
- value: impl Into<SqlInfoValue>,
- ) -> Self {
+ pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
self.infos.insert(name.as_u32(), value.into());
- self
- }
-
- /// Filter this info list keeping only the info values specified
- /// in `infos`.
- ///
- /// Returns self if infos is empty (no filtering)
- pub fn filter(&self, info: &[u32]) -> Cow<'_, Self> {
- if info.is_empty() {
- Cow::Borrowed(self)
- } else {
- let infos: BTreeMap<_, _> = info
- .iter()
- .filter_map(|name| self.infos.get(name).map(|v| (*name, v.clone())))
- .collect();
- Cow::Owned(Self { infos })
- }
}
/// Encode the contents of this list according to the [FlightSQL spec]
///
/// [FlightSQL spec]: (https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
- pub fn encode(&self) -> Result<RecordBatch> {
+ pub fn build(self) -> Result<SqlInfoData> {
let mut name_builder = UInt32Builder::new();
let mut value_builder = SqlInfoUnionBuilder::new();
- for (&name, value) in self.infos.iter() {
- name_builder.append_value(name);
+ let mut names: Vec<_> = self.infos.keys().cloned().collect();
+ names.sort_unstable();
+
+ for key in names {
+ let (name, value) = self.infos.get_key_value(&key).unwrap();
+ name_builder.append_value(*name);
value_builder.append_value(value)?
}
@@ -417,7 +384,8 @@ impl SqlInfoList {
("info_name", Arc::new(name_builder.finish()) as _),
("value", Arc::new(value_builder.finish()) as _),
])?;
- Ok(batch)
+
+ Ok(SqlInfoData { batch })
}
/// Return the [`Schema`] for a GetSchema RPC call with [`crate::sql::CommandGetSqlInfo`]
@@ -427,7 +395,89 @@ impl SqlInfoList {
}
}
-// The schema produced by [`SqlInfoList`]
+/// A builder for [`SqlInfoData`] which is used to create [`CommandGetSqlInfo`] responses.
+///
+/// # Example
+/// ```
+/// # use arrow_flight::sql::{metadata::SqlInfoDataBuilder, SqlInfo, SqlSupportedTransaction};
+/// // Create the list of metadata describing the server
+/// let mut builder = SqlInfoDataBuilder::new();
+/// builder.append(SqlInfo::FlightSqlServerName, "server name");
+/// // ... add other SqlInfo here ..
+/// builder.append(
+/// SqlInfo::FlightSqlServerTransaction,
+/// SqlSupportedTransaction::Transaction as i32,
+/// );
+///
+/// // Create the batch to send back to the client
+/// let info_data = builder.build().unwrap();
+/// ```
+///
+/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
+pub struct SqlInfoData {
+ batch: RecordBatch,
+}
+
+impl SqlInfoData {
+ /// Return a [`RecordBatch`] containing only the requested `u32`, if any
+ /// from [`CommandGetSqlInfo`]
+ pub fn record_batch(
+ &self,
+ info: impl IntoIterator<Item = u32>,
+ ) -> Result<RecordBatch> {
+ let arr: UInt32Array = downcast_array(self.batch.column(0).as_ref());
+ let type_filter = info
+ .into_iter()
+ .map(|tt| eq_scalar(&arr, tt))
+ .collect::<std::result::Result<Vec<_>, _>>()?
+ .into_iter()
+ // We know the arrays are of same length as they are produced fromn the same root array
+ .reduce(|filter, arr| or(&filter, &arr).unwrap());
+ if let Some(filter) = type_filter {
+ Ok(filter_record_batch(&self.batch, &filter)?)
+ } else {
+ Ok(self.batch.clone())
+ }
+ }
+
+ /// Return the schema of the RecordBatch that will be returned
+ /// from [`CommandGetSqlInfo`]
+ pub fn schema(&self) -> SchemaRef {
+ self.batch.schema()
+ }
+}
+
+/// A builder for a [`CommandGetSqlInfo`] response.
+pub struct GetSqlInfoBuilder<'a> {
+ /// requested `SqlInfo`s. If empty means return all infos.
+ info: Vec<u32>,
+ infos: &'a SqlInfoData,
+}
+
+impl CommandGetSqlInfo {
+ /// Create a builder suitable for constructing a response
+ pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder {
+ GetSqlInfoBuilder {
+ info: self.info,
+ infos,
+ }
+ }
+}
+
+impl GetSqlInfoBuilder<'_> {
+ /// Builds a `RecordBatch` with the correct schema for a [`CommandGetSqlInfo`] response
+ pub fn build(self) -> Result<RecordBatch> {
+ self.infos.record_batch(self.info)
+ }
+
+ /// Return the schema of the RecordBatch that will be returned
+ /// from [`CommandGetSqlInfo`]
+ pub fn schema(&self) -> SchemaRef {
+ self.infos.schema()
+ }
+}
+
+// The schema produced by [`SqlInfoData`]
static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::new(vec![
Field::new("info_name", DataType::UInt32, false),
@@ -439,7 +489,7 @@ static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
mod tests {
use std::collections::HashMap;
- use super::SqlInfoList;
+ use super::SqlInfoDataBuilder;
use crate::sql::metadata::tests::assert_batches_eq;
use crate::sql::{
SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert,
@@ -456,23 +506,23 @@ mod tests {
],
);
- let batch = SqlInfoList::new()
- // str
- .with_sql_info(SqlInfo::SqlIdentifierQuoteChar, r#"""#)
- // bool
- .with_sql_info(SqlInfo::SqlDdlCatalog, false)
- // i32
- .with_sql_info(
- SqlInfo::SqlNullOrdering,
- SqlNullOrdering::SqlNullsSortedHigh as i32,
- )
- // i64
- .with_sql_info(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64)
- // [str]
- .with_sql_info(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str])
- .with_sql_info(SqlInfo::SqlSupportsConvert, &convert)
- .encode()
- .unwrap();
+ let mut builder = SqlInfoDataBuilder::new();
+ // str
+ builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
+ // bool
+ builder.append(SqlInfo::SqlDdlCatalog, false);
+ // i32
+ builder.append(
+ SqlInfo::SqlNullOrdering,
+ SqlNullOrdering::SqlNullsSortedHigh as i32,
+ );
+ // i64
+ builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
+ // [str]
+ builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
+ builder.append(SqlInfo::SqlSupportsConvert, &convert);
+
+ let batch = builder.build().unwrap().record_batch(None).unwrap();
let expected = vec![
"+-----------+----------------------------------------+",
@@ -492,27 +542,26 @@ mod tests {
#[test]
fn test_filter_sql_infos() {
- let info_list = SqlInfoList::new()
- .with_sql_info(SqlInfo::FlightSqlServerName, "server name")
- .with_sql_info(
- SqlInfo::FlightSqlServerTransaction,
- SqlSupportedTransaction::Transaction as i32,
- );
-
- let batch = info_list.encode().unwrap();
+ let mut builder = SqlInfoDataBuilder::new();
+ builder.append(SqlInfo::FlightSqlServerName, "server name");
+ builder.append(
+ SqlInfo::FlightSqlServerTransaction,
+ SqlSupportedTransaction::Transaction as i32,
+ );
+ let data = builder.build().unwrap();
+
+ let batch = data.record_batch(None).unwrap();
assert_eq!(batch.num_rows(), 2);
- let batch = info_list
- .filter(&[SqlInfo::FlightSqlServerTransaction as u32])
- .encode()
- .unwrap();
- let ref_batch = SqlInfoList::new()
- .with_sql_info(
- SqlInfo::FlightSqlServerTransaction,
- SqlSupportedTransaction::Transaction as i32,
- )
- .encode()
+ let batch = data
+ .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
.unwrap();
+ let mut ref_builder = SqlInfoDataBuilder::new();
+ ref_builder.append(
+ SqlInfo::FlightSqlServerTransaction,
+ SqlSupportedTransaction::Transaction as i32,
+ );
+ let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
assert_eq!(batch, ref_batch);
}