You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/04/28 10:27:04 UTC
[arrow-datafusion] branch master updated: Allow table providers to
indicate their type for catalog metadata (#205)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 14f1eeb Allow table providers to indicate their type for catalog metadata (#205)
14f1eeb is described below
commit 14f1eebef068a9e65f556ed74d2b6d98376c97f4
Author: Ruan Pearce-Authers <ru...@outlook.com>
AuthorDate: Wed Apr 28 11:26:58 2021 +0100
Allow table providers to indicate their type for catalog metadata (#205)
---
datafusion/src/catalog/information_schema.rs | 45 ++++++++++---------
datafusion/src/datasource/datasource.rs | 16 +++++++
datafusion/src/datasource/mod.rs | 2 +-
datafusion/src/execution/context.rs | 67 +++++++++++++++++++++++++++-
4 files changed, 105 insertions(+), 25 deletions(-)
diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs
index 5a7b9d5..fd7fcb4 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -27,7 +27,7 @@ use arrow::{
record_batch::RecordBatch,
};
-use crate::datasource::{MemTable, TableProvider};
+use crate::datasource::{MemTable, TableProvider, TableType};
use super::{
catalog::{CatalogList, CatalogProvider},
@@ -105,14 +105,25 @@ impl InformationSchemaProvider {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
- builder.add_base_table(&catalog_name, &schema_name, table_name)
+ let table = schema.table(&table_name).unwrap();
+ builder.add_table(
+ &catalog_name,
+ &schema_name,
+ table_name,
+ table.table_type(),
+ );
}
}
}
// Add a final list for the information schema tables themselves
- builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES);
- builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, COLUMNS);
+ builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
+ builder.add_table(
+ &catalog_name,
+ INFORMATION_SCHEMA,
+ COLUMNS,
+ TableType::View,
+ );
}
let mem_table: MemTable = builder.into();
@@ -198,11 +209,12 @@ impl InformationSchemaTablesBuilder {
}
}
- fn add_base_table(
+ fn add_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
+ table_type: TableType,
) {
// Note: append_value is actually infallable.
self.catalog_names
@@ -212,24 +224,13 @@ impl InformationSchemaTablesBuilder {
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
- self.table_types.append_value("BASE TABLE").unwrap();
- }
-
- fn add_system_table(
- &mut self,
- catalog_name: impl AsRef<str>,
- schema_name: impl AsRef<str>,
- table_name: impl AsRef<str>,
- ) {
- // Note: append_value is actually infallable.
- self.catalog_names
- .append_value(catalog_name.as_ref())
+ self.table_types
+ .append_value(match table_type {
+ TableType::Base => "BASE TABLE",
+ TableType::View => "VIEW",
+ TableType::Temporary => "LOCAL TEMPORARY",
+ })
.unwrap();
- self.schema_names
- .append_value(schema_name.as_ref())
- .unwrap();
- self.table_names.append_value(table_name.as_ref()).unwrap();
- self.table_types.append_value("VIEW").unwrap();
}
}
diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs
index e2b0733..0349a49 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -66,6 +66,17 @@ pub enum TableProviderFilterPushDown {
Exact,
}
+/// Indicates the type of this table for metadata/catalog purposes.
+#[derive(Debug, Clone, Copy)]
+pub enum TableType {
+ /// An ordinary physical table.
+ Base,
+ /// A non-materialised table that itself uses a query internally to provide data.
+ View,
+ /// A transient table.
+ Temporary,
+}
+
/// Source table
pub trait TableProvider: Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
@@ -75,6 +86,11 @@ pub trait TableProvider: Sync + Send {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
+ /// Get the type of this table for metadata/catalog purposes.
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
/// Create an ExecutionPlan that will scan the table.
fn scan(
&self,
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 099098d..ac2f3d2 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -24,5 +24,5 @@ pub mod memory;
pub mod parquet;
pub use self::csv::{CsvFile, CsvReadOptions};
-pub use self::datasource::TableProvider;
+pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index c394d38..d25e7cc 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -840,10 +840,11 @@ mod tests {
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
- logical_plan::{col, create_udf, sum},
+ logical_plan::{col, create_udf, sum, Expr},
};
use crate::{
- datasource::MemTable, logical_plan::create_udaf,
+ datasource::{MemTable, TableType},
+ logical_plan::create_udaf,
physical_plan::expressions::AvgAccumulator,
};
use arrow::array::{
@@ -2632,6 +2633,68 @@ mod tests {
}
#[tokio::test]
+ async fn information_schema_tables_table_types() {
+ struct TestTable(TableType);
+
+ impl TableProvider for TestTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn table_type(&self) -> TableType {
+ self.0
+ }
+
+ fn schema(&self) -> SchemaRef {
+ unimplemented!()
+ }
+
+ fn scan(
+ &self,
+ _: &Option<Vec<usize>>,
+ _: usize,
+ _: &[Expr],
+ _: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ fn statistics(&self) -> crate::datasource::datasource::Statistics {
+ unimplemented!()
+ }
+ }
+
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_information_schema(true),
+ );
+
+ ctx.register_table("physical", Arc::new(TestTable(TableType::Base)))
+ .unwrap();
+ ctx.register_table("query", Arc::new(TestTable(TableType::View)))
+ .unwrap();
+ ctx.register_table("temp", Arc::new(TestTable(TableType::Temporary)))
+ .unwrap();
+
+ let result =
+ plan_and_collect(&mut ctx, "SELECT * from information_schema.tables")
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+---------------+--------------------+------------+-----------------+",
+ "| table_catalog | table_schema | table_name | table_type |",
+ "+---------------+--------------------+------------+-----------------+",
+ "| datafusion | information_schema | tables | VIEW |",
+ "| datafusion | information_schema | columns | VIEW |",
+ "| datafusion | public | physical | BASE TABLE |",
+ "| datafusion | public | query | VIEW |",
+ "| datafusion | public | temp | LOCAL TEMPORARY |",
+ "+---------------+--------------------+------------+-----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+ }
+
+ #[tokio::test]
async fn information_schema_show_tables_no_information_schema() {
let mut ctx = ExecutionContext::with_config(ExecutionConfig::new());