You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/01 20:34:09 UTC
[arrow-rs] branch master updated: feat(flight): add helpers to handle `CommandGetCatalogs`, `CommandGetSchemas`, and `CommandGetTables` requests (#4296)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new dde653912 feat(flight): add helpers to handle `CommandGetCatalogs`, `CommandGetSchemas`, and `CommandGetTables` requests (#4296)
dde653912 is described below
commit dde65391263039f1431df9eca6a429de0b9457b3
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Thu Jun 1 22:34:03 2023 +0200
feat(flight): add helpers to handle `CommandGetCatalogs`, `CommandGetSchemas`, and `CommandGetTables` requests (#4296)
* feat: add get catalog helpers
* fix: clippy in arrow_row example
* feat: add db schemas helpers
* chore: cleanup db schemas helpers
* feat: add table schema hlpers
* test: add tests and docs
* docs: add table queries to example server
* docs: improve builder docs
* fix: docs links
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Improve docs and tests for `SqlInfoList (#4293)
* Improve docs and tests for SqlInfoList
* Add an example/
* Update arrow-flight/src/sql/sql_info.rs
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---------
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
* fix: use FlightInfo builders
* chore: clippy
* fmt
* feat: add filters to GetTablesBuilder
* fix: clippy
* feat: more consistent builder apis
* chore: cleanup
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
arrow-flight/Cargo.toml | 9 +-
arrow-flight/examples/flight_sql_server.rs | 140 +++++++--
arrow-flight/src/sql/catalogs/db_schemas.rs | 284 +++++++++++++++++
arrow-flight/src/sql/catalogs/mod.rs | 123 ++++++++
arrow-flight/src/sql/catalogs/tables.rs | 466 ++++++++++++++++++++++++++++
arrow-flight/src/sql/mod.rs | 1 +
arrow-row/src/lib.rs | 2 +-
7 files changed, 1001 insertions(+), 24 deletions(-)
diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 206cc6505..ae9759b66 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -27,13 +27,18 @@ repository = { workspace = true }
license = { workspace = true }
[dependencies]
+arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
-arrow-data = { workspace = true }
+arrow-data = { workspace = true, optional = true }
arrow-ipc = { workspace = true }
+arrow-ord = { workspace = true, optional = true }
+arrow-row = { workspace = true, optional = true }
+arrow-select = { workspace = true, optional = true }
arrow-schema = { workspace = true }
+arrow-string = { workspace = true, optional = true }
base64 = { version = "0.21", default-features = false, features = ["std"] }
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
@@ -53,7 +58,7 @@ all-features = true
[features]
default = []
-flight-sql-experimental = ["once_cell"]
+flight-sql-experimental = ["arrow-arith", "arrow-data", "arrow-ord", "arrow-row", "arrow-select", "arrow-string", "once_cell"]
tls = ["tonic/tls"]
# Enable CLI tools
diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs
index 783e0bf5b..6b92621a5 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -20,6 +20,7 @@ use base64::Engine;
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
+use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
@@ -29,6 +30,9 @@ use tonic::{Request, Response, Status, Streaming};
use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
+use arrow_flight::sql::catalogs::{
+ get_catalogs_schema, get_db_schemas_schema, get_tables_schema,
+};
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
@@ -72,6 +76,8 @@ static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
});
+static TABLES: Lazy<Vec<&'static str>> = Lazy::new(|| vec!["flight_sql.example.table"]);
+
#[derive(Clone)]
pub struct FlightSqlServiceImpl {}
@@ -236,32 +242,62 @@ impl FlightSqlService for FlightSqlServiceImpl {
async fn get_flight_info_catalogs(
&self,
- _query: CommandGetCatalogs,
- _request: Request<FlightDescriptor>,
+ query: CommandGetCatalogs,
+ request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
- Err(Status::unimplemented(
- "get_flight_info_catalogs not implemented",
- ))
+ let flight_descriptor = request.into_inner();
+ let ticket = Ticket {
+ ticket: query.encode_to_vec().into(),
+ };
+ let endpoint = FlightEndpoint::new().with_ticket(ticket);
+
+ let flight_info = FlightInfo::new()
+ .try_with_schema(get_catalogs_schema())
+ .map_err(|e| status!("Unable to encode schema", e))?
+ .with_endpoint(endpoint)
+ .with_descriptor(flight_descriptor);
+
+ Ok(tonic::Response::new(flight_info))
}
async fn get_flight_info_schemas(
&self,
- _query: CommandGetDbSchemas,
- _request: Request<FlightDescriptor>,
+ query: CommandGetDbSchemas,
+ request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
- Err(Status::unimplemented(
- "get_flight_info_schemas not implemented",
- ))
+ let flight_descriptor = request.into_inner();
+ let ticket = Ticket {
+ ticket: query.encode_to_vec().into(),
+ };
+ let endpoint = FlightEndpoint::new().with_ticket(ticket);
+
+ let flight_info = FlightInfo::new()
+ .try_with_schema(get_db_schemas_schema().as_ref())
+ .map_err(|e| status!("Unable to encode schema", e))?
+ .with_endpoint(endpoint)
+ .with_descriptor(flight_descriptor);
+
+ Ok(tonic::Response::new(flight_info))
}
async fn get_flight_info_tables(
&self,
- _query: CommandGetTables,
- _request: Request<FlightDescriptor>,
+ query: CommandGetTables,
+ request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
- Err(Status::unimplemented(
- "get_flight_info_tables not implemented",
- ))
+ let flight_descriptor = request.into_inner();
+ let ticket = Ticket {
+ ticket: query.encode_to_vec().into(),
+ };
+ let endpoint = FlightEndpoint::new().with_ticket(ticket);
+
+ let flight_info = FlightInfo::new()
+ .try_with_schema(get_tables_schema(query.include_schema).as_ref())
+ .map_err(|e| status!("Unable to encode schema", e))?
+ .with_endpoint(endpoint)
+ .with_descriptor(flight_descriptor);
+
+ Ok(tonic::Response::new(flight_info))
}
async fn get_flight_info_table_types(
@@ -363,26 +399,88 @@ impl FlightSqlService for FlightSqlServiceImpl {
async fn do_get_catalogs(
&self,
- _query: CommandGetCatalogs,
+ query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_catalogs not implemented"))
+ let catalog_names = TABLES
+ .iter()
+ .map(|full_name| full_name.split('.').collect::<Vec<_>>()[0].to_string())
+ .collect::<HashSet<_>>();
+ let mut builder = query.into_builder();
+ for catalog_name in catalog_names {
+ builder.append(catalog_name);
+ }
+ let batch = builder.build();
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(Arc::new(get_catalogs_schema().clone()))
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_schemas(
&self,
- _query: CommandGetDbSchemas,
+ query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_schemas not implemented"))
+ let schemas = TABLES
+ .iter()
+ .map(|full_name| {
+ let parts = full_name.split('.').collect::<Vec<_>>();
+ (parts[0].to_string(), parts[1].to_string())
+ })
+ .collect::<HashSet<_>>();
+
+ let mut builder = query.into_builder();
+ for (catalog_name, schema_name) in schemas {
+ builder.append(catalog_name, schema_name);
+ }
+
+ let batch = builder.build();
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(get_db_schemas_schema())
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_tables(
&self,
- _query: CommandGetTables,
+ query: CommandGetTables,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_tables not implemented"))
+ let tables = TABLES
+ .iter()
+ .map(|full_name| {
+ let parts = full_name.split('.').collect::<Vec<_>>();
+ (
+ parts[0].to_string(),
+ parts[1].to_string(),
+ parts[2].to_string(),
+ )
+ })
+ .collect::<HashSet<_>>();
+
+ let dummy_schema = Schema::empty();
+ let mut builder = query.into_builder();
+ for (catalog_name, schema_name, table_name) in tables {
+ builder
+ .append(
+ catalog_name,
+ schema_name,
+ table_name,
+ "TABLE",
+ &dummy_schema,
+ )
+ .map_err(Status::from)?;
+ }
+
+ let batch = builder.build();
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(get_db_schemas_schema())
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_table_types(
diff --git a/arrow-flight/src/sql/catalogs/db_schemas.rs b/arrow-flight/src/sql/catalogs/db_schemas.rs
new file mode 100644
index 000000000..76c5499c8
--- /dev/null
+++ b/arrow-flight/src/sql/catalogs/db_schemas.rs
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetSchemasBuilder`] for building responses to [`CommandGetDbSchemas`] queries.
+//!
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::and;
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch};
+use arrow_ord::comparison::eq_utf8_scalar;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::sql::CommandGetDbSchemas;
+
+/// Return the schema of the RecordBatch that will be returned from [`CommandGetDbSchemas`]
+///
+/// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+pub fn get_db_schemas_schema() -> SchemaRef {
+ Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
+}
+
+/// The schema for GetDbSchemas
+static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ ]))
+});
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+pub struct GetSchemasBuilder {
+ // Specifies the Catalog to search for the tables.
+ // - An empty string retrieves those without a catalog.
+ // - If omitted the catalog name is not used to narrow the search.
+ catalog_filter: Option<String>,
+ // Optional filters to apply
+ db_schema_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for schema names
+ db_schema_name: StringBuilder,
+}
+
+impl CommandGetDbSchemas {
+ pub fn into_builder(self) -> GetSchemasBuilder {
+ self.into()
+ }
+}
+
+impl From<CommandGetDbSchemas> for GetSchemasBuilder {
+ fn from(value: CommandGetDbSchemas) -> Self {
+ Self::new(value.catalog, value.db_schema_filter_pattern)
+ }
+}
+
+impl GetSchemasBuilder {
+ /// Create a new instance of [`GetSchemasBuilder`]
+ ///
+ /// # Parameters
+ ///
+ /// - `catalog`: Specifies the Catalog to search for the tables.
+ /// - An empty string retrieves those without a catalog.
+ /// - If omitted the catalog name is not used to narrow the search.
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow the search.
+ /// In the pattern string, two special characters can be used to denote matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ ///
+ /// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+ pub fn new(
+ catalog: Option<impl Into<String>>,
+ db_schema_filter_pattern: Option<impl Into<String>>,
+ ) -> Self {
+ Self {
+ catalog_filter: catalog.map(|v| v.into()),
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|v| v.into()),
+ catalog_name: StringBuilder::new(),
+ db_schema_name: StringBuilder::new(),
+ }
+ }
+
+ /// Append a row
+ ///
+ /// In case the catalog should be considered as empty, pass in an empty string '""'.
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ ) {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ }
+
+ /// builds a `RecordBatch` with the correct schema for a `CommandGetDbSchemas` response
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ catalog_filter,
+ db_schema_filter_pattern,
+ mut catalog_name,
+ mut db_schema_name,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+
+ let mut filters = vec![];
+
+ if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(
+ &db_schema_name,
+ &db_schema_filter_pattern,
+ )?)
+ }
+
+ if let Some(catalog_filter_name) = catalog_filter {
+ filters.push(eq_utf8_scalar(&catalog_name, &catalog_filter_name)?);
+ }
+
+ // `AND` any filters together
+ let mut total_filter = None;
+ while let Some(filter) = filters.pop() {
+ let new_filter = match total_filter {
+ Some(total_filter) => and(&total_filter, &filter)?,
+ None => filter,
+ };
+ total_filter = Some(new_filter);
+ }
+
+ let batch = RecordBatch::try_new(
+ get_db_schemas_schema(),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ ],
+ )?;
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(filter) = total_filter {
+ filter_record_batch(&batch, &filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name
+ let indices = lexsort_to_indices(filtered_batch.columns());
+ let columns = filtered_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+
+ Ok(RecordBatch::try_new(get_db_schemas_schema(), columns)?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{StringArray, UInt32Array};
+
+ fn get_ref_batch() -> RecordBatch {
+ RecordBatch::try_new(
+ get_db_schemas_schema(),
+ vec![
+ Arc::new(StringArray::from(vec![
+ "a_catalog",
+ "a_catalog",
+ "b_catalog",
+ "b_catalog",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_schema", "b_schema", "a_schema", "b_schema",
+ ])) as ArrayRef,
+ ],
+ )
+ .unwrap()
+ }
+
+ #[test]
+ fn test_schemas_are_filtered() {
+ let ref_batch = get_ref_batch();
+
+ let mut builder = GetSchemasBuilder::new(None::<String>, None::<String>);
+ builder.append("a_catalog", "a_schema");
+ builder.append("a_catalog", "b_schema");
+ builder.append("b_catalog", "a_schema");
+ builder.append("b_catalog", "b_schema");
+ let schema_batch = builder.build().unwrap();
+
+ assert_eq!(schema_batch, ref_batch);
+
+ let mut builder = GetSchemasBuilder::new(None::<String>, Some("a%"));
+ builder.append("a_catalog", "a_schema");
+ builder.append("a_catalog", "b_schema");
+ builder.append("b_catalog", "a_schema");
+ builder.append("b_catalog", "b_schema");
+ let schema_batch = builder.build().unwrap();
+
+ let indices = UInt32Array::from(vec![0, 2]);
+ let ref_filtered = RecordBatch::try_new(
+ get_db_schemas_schema(),
+ ref_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .unwrap(),
+ )
+ .unwrap();
+
+ assert_eq!(schema_batch, ref_filtered);
+ }
+
+ #[test]
+ fn test_schemas_are_sorted() {
+ let ref_batch = get_ref_batch();
+
+ let mut builder = GetSchemasBuilder::new(None::<String>, None::<String>);
+ builder.append("a_catalog", "b_schema");
+ builder.append("b_catalog", "a_schema");
+ builder.append("a_catalog", "a_schema");
+ builder.append("b_catalog", "b_schema");
+ let schema_batch = builder.build().unwrap();
+
+ assert_eq!(schema_batch, ref_batch)
+ }
+
+ #[test]
+ fn test_builder_from_query() {
+ let ref_batch = get_ref_batch();
+ let query = CommandGetDbSchemas {
+ catalog: Some("a_catalog".into()),
+ db_schema_filter_pattern: Some("b%".into()),
+ };
+
+ let mut builder = query.into_builder();
+ builder.append("a_catalog", "a_schema");
+ builder.append("a_catalog", "b_schema");
+ builder.append("b_catalog", "a_schema");
+ builder.append("b_catalog", "b_schema");
+ let schema_batch = builder.build().unwrap();
+
+ let indices = UInt32Array::from(vec![1]);
+ let ref_filtered = RecordBatch::try_new(
+ get_db_schemas_schema(),
+ ref_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .unwrap(),
+ )
+ .unwrap();
+
+ assert_eq!(schema_batch, ref_filtered);
+ }
+}
diff --git a/arrow-flight/src/sql/catalogs/mod.rs b/arrow-flight/src/sql/catalogs/mod.rs
new file mode 100644
index 000000000..e4cbb6fed
--- /dev/null
+++ b/arrow-flight/src/sql/catalogs/mod.rs
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builders and function for building responses to information schema requests
+//!
+//! - [`get_catalogs_batch`] and [`get_catalogs_schema`] for building responses to [`CommandGetCatalogs`] queries.
+//! - [`GetSchemasBuilder`] and [`get_db_schemas_schema`] for building responses to [`CommandGetDbSchemas`] queries.
+//! - [`GetTablesBuilder`] and [`get_tables_schema`] for building responses to [`CommandGetTables`] queries.
+//!
+//! [`CommandGetCatalogs`]: crate::sql::CommandGetCatalogs
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt32Array};
+use arrow_row::{RowConverter, SortField};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use once_cell::sync::Lazy;
+
+use crate::error::Result;
+use crate::sql::CommandGetCatalogs;
+
+pub use db_schemas::{get_db_schemas_schema, GetSchemasBuilder};
+pub use tables::{get_tables_schema, GetTablesBuilder};
+
+mod db_schemas;
+mod tables;
+
+pub struct GetCatalogsBuilder {
+ catalogs: Vec<String>,
+}
+
+impl CommandGetCatalogs {
+ pub fn into_builder(self) -> GetCatalogsBuilder {
+ self.into()
+ }
+}
+
+impl From<CommandGetCatalogs> for GetCatalogsBuilder {
+ fn from(_: CommandGetCatalogs) -> Self {
+ Self::new()
+ }
+}
+
+impl Default for GetCatalogsBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl GetCatalogsBuilder {
+ /// Create a new instance of [`GetCatalogsBuilder`]
+ pub fn new() -> Self {
+ Self {
+ catalogs: Vec::new(),
+ }
+ }
+
+ /// Append a row
+ pub fn append(&mut self, catalog_name: impl Into<String>) {
+ self.catalogs.push(catalog_name.into());
+ }
+
+ /// builds a `RecordBatch` with the correct schema for a `CommandGetCatalogs` response
+ pub fn build(self) -> Result<RecordBatch> {
+ get_catalogs_batch(self.catalogs)
+ }
+}
+
+/// Returns the RecordBatch for `CommandGetCatalogs`
+pub fn get_catalogs_batch(mut catalog_names: Vec<String>) -> Result<RecordBatch> {
+ catalog_names.sort_unstable();
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&GET_CATALOG_SCHEMA),
+ vec![Arc::new(StringArray::from_iter_values(catalog_names)) as _],
+ )?;
+
+ Ok(batch)
+}
+
+/// Returns the schema that will result from [`CommandGetCatalogs`]
+///
+/// [`CommandGetCatalogs`]: crate::sql::CommandGetCatalogs
+pub fn get_catalogs_schema() -> &'static Schema {
+ &GET_CATALOG_SCHEMA
+}
+
+/// The schema for GetCatalogs
+static GET_CATALOG_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![Field::new(
+ "catalog_name",
+ DataType::Utf8,
+ false,
+ )]))
+});
+
+fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
+ let fields = arrays
+ .iter()
+ .map(|a| SortField::new(a.data_type().clone()))
+ .collect();
+ let mut converter = RowConverter::new(fields).unwrap();
+ let rows = converter.convert_columns(arrays).unwrap();
+ let mut sort: Vec<_> = rows.iter().enumerate().collect();
+ sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+ UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
+}
diff --git a/arrow-flight/src/sql/catalogs/tables.rs b/arrow-flight/src/sql/catalogs/tables.rs
new file mode 100644
index 000000000..fcdc0dbb7
--- /dev/null
+++ b/arrow-flight/src/sql/catalogs/tables.rs
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`] queries.
+//!
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::{and, or};
+use arrow_array::builder::{BinaryBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_ord::comparison::eq_utf8_scalar;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::sql::CommandGetTables;
+use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
+
+/// Return the schema of the RecordBatch that will be returned from [`CommandGetTables`]
+///
+/// Note the schema differs based on the values of `include_schema
+///
+/// [`CommandGetTables`]: crate::sql::CommandGetTables
+pub fn get_tables_schema(include_schema: bool) -> SchemaRef {
+ if include_schema {
+ Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
+ } else {
+ Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
+ }
+}
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+/// * table_name: utf8 not null,
+/// * table_type: utf8 not null,
+/// * (optional) table_schema: bytes not null (schema of the table as described
+/// in Schema.fbs::Schema it is serialized as an IPC message.)
+pub struct GetTablesBuilder {
+ catalog_filter: Option<String>,
+ table_types_filter: Vec<String>,
+ // Optional filters to apply to schemas
+ db_schema_filter_pattern: Option<String>,
+ // Optional filters to apply to tables
+ table_name_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for db schema names
+ db_schema_name: StringBuilder,
+ // array builder for tables names
+ table_name: StringBuilder,
+ // array builder for table types
+ table_type: StringBuilder,
+ // array builder for table schemas
+ table_schema: Option<BinaryBuilder>,
+}
+
+impl CommandGetTables {
+ pub fn into_builder(self) -> GetTablesBuilder {
+ self.into()
+ }
+}
+
+impl From<CommandGetTables> for GetTablesBuilder {
+ fn from(value: CommandGetTables) -> Self {
+ Self::new(
+ value.catalog,
+ value.db_schema_filter_pattern,
+ value.table_name_filter_pattern,
+ value.table_types,
+ value.include_schema,
+ )
+ }
+}
+
+impl GetTablesBuilder {
+ /// Create a new instance of [`GetTablesBuilder`]
+ ///
+ /// # Paramneters
+ ///
+ /// - `catalog`: Specifies the Catalog to search for the tables.
+ /// - An empty string retrieves those without a catalog.
+ /// - If omitted the catalog name is not used to narrow the search.
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow the search.
+ /// In the pattern string, two special characters can be used to denote matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `table_name_filter_pattern`: Specifies a filter pattern for tables to search for.
+ /// When no pattern is provided, all tables matching other filters are searched.
+ /// In the pattern string, two special characters can be used to denote matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `table_types`: Specifies a filter of table types which must match.
+ /// An empy Vec matches all table types.
+ /// - `include_schema`: Specifies if the Arrow schema should be returned for found tables.
+ ///
+ /// [`CommandGetTables`]: crate::sql::CommandGetTables
+ pub fn new(
+ catalog: Option<impl Into<String>>,
+ db_schema_filter_pattern: Option<impl Into<String>>,
+ table_name_filter_pattern: Option<impl Into<String>>,
+ table_types: impl IntoIterator<Item = impl Into<String>>,
+ include_schema: bool,
+ ) -> Self {
+ let table_schema = if include_schema {
+ Some(BinaryBuilder::new())
+ } else {
+ None
+ };
+ Self {
+ catalog_filter: catalog.map(|s| s.into()),
+ table_types_filter: table_types.into_iter().map(|tt| tt.into()).collect(),
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()),
+ table_name_filter_pattern: table_name_filter_pattern.map(|t| t.into()),
+ catalog_name: StringBuilder::new(),
+ db_schema_name: StringBuilder::new(),
+ table_name: StringBuilder::new(),
+ table_type: StringBuilder::new(),
+ table_schema,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ table_name: impl AsRef<str>,
+ table_type: impl AsRef<str>,
+ table_schema: &Schema,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ self.table_name.append_value(table_name);
+ self.table_type.append_value(table_type);
+ if let Some(self_table_schema) = self.table_schema.as_mut() {
+ let options = IpcWriteOptions::default();
+ // encode the schema into the correct form
+ let message: std::result::Result<IpcMessage, _> =
+ SchemaAsIpc::new(table_schema, &options).try_into();
+ let IpcMessage(schema) = message?;
+ self_table_schema.append_value(schema);
+ }
+
+ Ok(())
+ }
+
+ /// builds a `RecordBatch` for `CommandGetTables`
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ catalog_filter,
+ table_types_filter,
+ db_schema_filter_pattern,
+ table_name_filter_pattern,
+
+ mut catalog_name,
+ mut db_schema_name,
+ mut table_name,
+ mut table_type,
+ table_schema,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+ let table_name = table_name.finish();
+ let table_type = table_type.finish();
+ let table_schema = table_schema.map(|mut table_schema| table_schema.finish());
+
+ // apply any filters, getting a BooleanArray that represents
+ // the rows that passed the filter
+ let mut filters = vec![];
+
+ if let Some(catalog_filter_name) = catalog_filter {
+ filters.push(eq_utf8_scalar(&catalog_name, &catalog_filter_name)?);
+ }
+
+ let tt_filter = table_types_filter
+ .into_iter()
+ .map(|tt| eq_utf8_scalar(&table_type, &tt))
+ .collect::<std::result::Result<Vec<_>, _>>()?
+ .into_iter()
+ // We know the arrays are of same length as they are produced fromn the same root array
+ .reduce(|filter, arr| or(&filter, &arr).unwrap());
+ if let Some(filter) = tt_filter {
+ filters.push(filter);
+ }
+
+ if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(
+ &db_schema_name,
+ &db_schema_filter_pattern,
+ )?)
+ }
+
+ if let Some(table_name_filter_pattern) = table_name_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(&table_name, &table_name_filter_pattern)?)
+ }
+
+ let include_schema = table_schema.is_some();
+ let batch = if let Some(table_schema) = table_schema {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ Arc::new(table_schema) as ArrayRef,
+ ],
+ )
+ } else {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ ],
+ )
+ }?;
+
+ // `AND` any filters together
+ let mut total_filter = None;
+ while let Some(filter) = filters.pop() {
+ let new_filter = match total_filter {
+ Some(total_filter) => and(&total_filter, &filter)?,
+ None => filter,
+ };
+ total_filter = Some(new_filter);
+ }
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(total_filter) = total_filter {
+ filter_record_batch(&batch, &total_filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name, then table_name, then table_type
+ // https://github.com/apache/arrow/blob/130f9e981aa98c25de5f5bfe55185db270cec313/format/FlightSql.proto#LL1202C1-L1202C1
+ let sort_cols = filtered_batch.project(&[0, 1, 2, 3])?;
+ let indices = lexsort_to_indices(sort_cols.columns());
+ let columns = filtered_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+
+ Ok(RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ columns,
+ )?)
+ }
+}
+
+/// The schema for GetTables without `table_schema` column
+static GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ ]))
+});
+
+/// The schema for GetTables with `table_schema` column
+static GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ Field::new("table_schema", DataType::Binary, false),
+ ]))
+});
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{StringArray, UInt32Array};
+
+ fn get_ref_batch() -> RecordBatch {
+ RecordBatch::try_new(
+ get_tables_schema(false),
+ vec![
+ Arc::new(StringArray::from(vec![
+ "a_catalog",
+ "a_catalog",
+ "a_catalog",
+ "a_catalog",
+ "b_catalog",
+ "b_catalog",
+ "b_catalog",
+ "b_catalog",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_schema", "a_schema", "b_schema", "b_schema", "a_schema",
+ "a_schema", "b_schema", "b_schema",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_table", "b_table", "a_table", "b_table", "a_table", "a_table",
+ "b_table", "b_table",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "TABLE", "TABLE", "TABLE", "TABLE", "TABLE", "VIEW", "TABLE", "VIEW",
+ ])) as ArrayRef,
+ ],
+ )
+ .unwrap()
+ }
+
+ fn get_ref_builder(
+ catalog: Option<&str>,
+ db_schema_filter_pattern: Option<&str>,
+ table_name_filter_pattern: Option<&str>,
+ table_types: Vec<&str>,
+ include_schema: bool,
+ ) -> GetTablesBuilder {
+ let dummy_schema = Schema::empty();
+ let tables = [
+ ("a_catalog", "a_schema", "a_table", "TABLE"),
+ ("a_catalog", "a_schema", "b_table", "TABLE"),
+ ("a_catalog", "b_schema", "a_table", "TABLE"),
+ ("a_catalog", "b_schema", "b_table", "TABLE"),
+ ("b_catalog", "a_schema", "a_table", "TABLE"),
+ ("b_catalog", "a_schema", "a_table", "VIEW"),
+ ("b_catalog", "b_schema", "b_table", "TABLE"),
+ ("b_catalog", "b_schema", "b_table", "VIEW"),
+ ];
+ let mut builder = GetTablesBuilder::new(
+ catalog,
+ db_schema_filter_pattern,
+ table_name_filter_pattern,
+ table_types,
+ include_schema,
+ );
+ for (catalog_name, schema_name, table_name, table_type) in tables {
+ builder
+ .append(
+ catalog_name,
+ schema_name,
+ table_name,
+ table_type,
+ &dummy_schema,
+ )
+ .unwrap();
+ }
+ builder
+ }
+
+ #[test]
+ fn test_tables_are_filtered() {
+ let ref_batch = get_ref_batch();
+
+ let builder = get_ref_builder(None, None, None, Vec::new(), false);
+ let table_batch = builder.build().unwrap();
+ assert_eq!(table_batch, ref_batch);
+
+ let builder = get_ref_builder(None, Some("a%"), Some("a%"), Vec::new(), false);
+ let table_batch = builder.build().unwrap();
+ let indices = UInt32Array::from(vec![0, 4, 5]);
+ let ref_filtered = RecordBatch::try_new(
+ get_tables_schema(false),
+ ref_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .unwrap(),
+ )
+ .unwrap();
+ assert_eq!(table_batch, ref_filtered);
+
+ let builder = get_ref_builder(Some("a_catalog"), None, None, Vec::new(), false);
+ let table_batch = builder.build().unwrap();
+ let indices = UInt32Array::from(vec![0, 1, 2, 3]);
+ let ref_filtered = RecordBatch::try_new(
+ get_tables_schema(false),
+ ref_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .unwrap(),
+ )
+ .unwrap();
+ assert_eq!(table_batch, ref_filtered);
+
+ let builder = get_ref_builder(None, None, None, vec!["VIEW"], false);
+ let table_batch = builder.build().unwrap();
+ let indices = UInt32Array::from(vec![5, 7]);
+ let ref_filtered = RecordBatch::try_new(
+ get_tables_schema(false),
+ ref_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .unwrap(),
+ )
+ .unwrap();
+ assert_eq!(table_batch, ref_filtered);
+ }
+
+ #[test]
+ fn test_tables_are_sorted() {
+ let ref_batch = get_ref_batch();
+ let dummy_schema = Schema::empty();
+
+ let tables = [
+ ("b_catalog", "a_schema", "a_table", "TABLE"),
+ ("b_catalog", "b_schema", "b_table", "TABLE"),
+ ("b_catalog", "b_schema", "b_table", "VIEW"),
+ ("b_catalog", "a_schema", "a_table", "VIEW"),
+ ("a_catalog", "a_schema", "a_table", "TABLE"),
+ ("a_catalog", "b_schema", "a_table", "TABLE"),
+ ("a_catalog", "b_schema", "b_table", "TABLE"),
+ ("a_catalog", "a_schema", "b_table", "TABLE"),
+ ];
+ let mut builder = GetTablesBuilder::new(
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ false,
+ );
+ for (catalog_name, schema_name, table_name, table_type) in tables {
+ builder
+ .append(
+ catalog_name,
+ schema_name,
+ table_name,
+ table_type,
+ &dummy_schema,
+ )
+ .unwrap();
+ }
+ let table_batch = builder.build().unwrap();
+ assert_eq!(table_batch, ref_batch);
+ }
+}
diff --git a/arrow-flight/src/sql/mod.rs b/arrow-flight/src/sql/mod.rs
index d73dc2809..212655d66 100644
--- a/arrow-flight/src/sql/mod.rs
+++ b/arrow-flight/src/sql/mod.rs
@@ -100,6 +100,7 @@ pub use gen::XdbcDatetimeSubcode;
pub use sql_info::SqlInfoList;
+pub mod catalogs;
pub mod client;
pub mod server;
pub mod sql_info;
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 9010c8d9a..5b9a1bb88 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -110,7 +110,7 @@
//! .map(|a| SortField::new(a.data_type().clone()))
//! .collect();
//! let mut converter = RowConverter::new(fields).unwrap();
-//! let rows = converter.convert_columns(&arrays).unwrap();
+//! let rows = converter.convert_columns(arrays).unwrap();
//! let mut sort: Vec<_> = rows.iter().enumerate().collect();
//! sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
//! UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))