You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/04/28 10:27:04 UTC

[arrow-datafusion] branch master updated: Allow table providers to indicate their type for catalog metadata (#205)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f1eeb  Allow table providers to indicate their type for catalog metadata (#205)
14f1eeb is described below

commit 14f1eebef068a9e65f556ed74d2b6d98376c97f4
Author: Ruan Pearce-Authers <ru...@outlook.com>
AuthorDate: Wed Apr 28 11:26:58 2021 +0100

    Allow table providers to indicate their type for catalog metadata (#205)
---
 datafusion/src/catalog/information_schema.rs | 45 ++++++++++---------
 datafusion/src/datasource/datasource.rs      | 16 +++++++
 datafusion/src/datasource/mod.rs             |  2 +-
 datafusion/src/execution/context.rs          | 67 +++++++++++++++++++++++++++-
 4 files changed, 105 insertions(+), 25 deletions(-)

diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs
index 5a7b9d5..fd7fcb4 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -27,7 +27,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 
-use crate::datasource::{MemTable, TableProvider};
+use crate::datasource::{MemTable, TableProvider, TableType};
 
 use super::{
     catalog::{CatalogList, CatalogProvider},
@@ -105,14 +105,25 @@ impl InformationSchemaProvider {
                 if schema_name != INFORMATION_SCHEMA {
                     let schema = catalog.schema(&schema_name).unwrap();
                     for table_name in schema.table_names() {
-                        builder.add_base_table(&catalog_name, &schema_name, table_name)
+                        let table = schema.table(&table_name).unwrap();
+                        builder.add_table(
+                            &catalog_name,
+                            &schema_name,
+                            table_name,
+                            table.table_type(),
+                        );
                     }
                 }
             }
 
             // Add a final list for the information schema tables themselves
-            builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES);
-            builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, COLUMNS);
+            builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
+            builder.add_table(
+                &catalog_name,
+                INFORMATION_SCHEMA,
+                COLUMNS,
+                TableType::View,
+            );
         }
 
         let mem_table: MemTable = builder.into();
@@ -198,11 +209,12 @@ impl InformationSchemaTablesBuilder {
         }
     }
 
-    fn add_base_table(
+    fn add_table(
         &mut self,
         catalog_name: impl AsRef<str>,
         schema_name: impl AsRef<str>,
         table_name: impl AsRef<str>,
+        table_type: TableType,
     ) {
         // Note: append_value is actually infallable.
         self.catalog_names
@@ -212,24 +224,13 @@ impl InformationSchemaTablesBuilder {
             .append_value(schema_name.as_ref())
             .unwrap();
         self.table_names.append_value(table_name.as_ref()).unwrap();
-        self.table_types.append_value("BASE TABLE").unwrap();
-    }
-
-    fn add_system_table(
-        &mut self,
-        catalog_name: impl AsRef<str>,
-        schema_name: impl AsRef<str>,
-        table_name: impl AsRef<str>,
-    ) {
-        // Note: append_value is actually infallable.
-        self.catalog_names
-            .append_value(catalog_name.as_ref())
+        self.table_types
+            .append_value(match table_type {
+                TableType::Base => "BASE TABLE",
+                TableType::View => "VIEW",
+                TableType::Temporary => "LOCAL TEMPORARY",
+            })
             .unwrap();
-        self.schema_names
-            .append_value(schema_name.as_ref())
-            .unwrap();
-        self.table_names.append_value(table_name.as_ref()).unwrap();
-        self.table_types.append_value("VIEW").unwrap();
     }
 }
 
diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs
index e2b0733..0349a49 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -66,6 +66,17 @@ pub enum TableProviderFilterPushDown {
     Exact,
 }
 
+/// Indicates the type of this table for metadata/catalog purposes.
+#[derive(Debug, Clone, Copy)]
+pub enum TableType {
+    /// An ordinary physical table.
+    Base,
+    /// A non-materialised table that itself uses a query internally to provide data.
+    View,
+    /// A transient table.
+    Temporary,
+}
+
 /// Source table
 pub trait TableProvider: Sync + Send {
     /// Returns the table provider as [`Any`](std::any::Any) so that it can be
@@ -75,6 +86,11 @@ pub trait TableProvider: Sync + Send {
     /// Get a reference to the schema for this table
     fn schema(&self) -> SchemaRef;
 
+    /// Get the type of this table for metadata/catalog purposes.
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     /// Create an ExecutionPlan that will scan the table.
     fn scan(
         &self,
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 099098d..ac2f3d2 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -24,5 +24,5 @@ pub mod memory;
 pub mod parquet;
 
 pub use self::csv::{CsvFile, CsvReadOptions};
-pub use self::datasource::TableProvider;
+pub use self::datasource::{TableProvider, TableType};
 pub use self::memory::MemTable;
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index c394d38..d25e7cc 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -840,10 +840,11 @@ mod tests {
     use crate::variable::VarType;
     use crate::{
         assert_batches_eq, assert_batches_sorted_eq,
-        logical_plan::{col, create_udf, sum},
+        logical_plan::{col, create_udf, sum, Expr},
     };
     use crate::{
-        datasource::MemTable, logical_plan::create_udaf,
+        datasource::{MemTable, TableType},
+        logical_plan::create_udaf,
         physical_plan::expressions::AvgAccumulator,
     };
     use arrow::array::{
@@ -2632,6 +2633,68 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn information_schema_tables_table_types() {
+        struct TestTable(TableType);
+
+        impl TableProvider for TestTable {
+            fn as_any(&self) -> &dyn std::any::Any {
+                self
+            }
+
+            fn table_type(&self) -> TableType {
+                self.0
+            }
+
+            fn schema(&self) -> SchemaRef {
+                unimplemented!()
+            }
+
+            fn scan(
+                &self,
+                _: &Option<Vec<usize>>,
+                _: usize,
+                _: &[Expr],
+                _: Option<usize>,
+            ) -> Result<Arc<dyn ExecutionPlan>> {
+                unimplemented!()
+            }
+
+            fn statistics(&self) -> crate::datasource::datasource::Statistics {
+                unimplemented!()
+            }
+        }
+
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_information_schema(true),
+        );
+
+        ctx.register_table("physical", Arc::new(TestTable(TableType::Base)))
+            .unwrap();
+        ctx.register_table("query", Arc::new(TestTable(TableType::View)))
+            .unwrap();
+        ctx.register_table("temp", Arc::new(TestTable(TableType::Temporary)))
+            .unwrap();
+
+        let result =
+            plan_and_collect(&mut ctx, "SELECT * from information_schema.tables")
+                .await
+                .unwrap();
+
+        let expected = vec![
+            "+---------------+--------------------+------------+-----------------+",
+            "| table_catalog | table_schema       | table_name | table_type      |",
+            "+---------------+--------------------+------------+-----------------+",
+            "| datafusion    | information_schema | tables     | VIEW            |",
+            "| datafusion    | information_schema | columns    | VIEW            |",
+            "| datafusion    | public             | physical   | BASE TABLE      |",
+            "| datafusion    | public             | query      | VIEW            |",
+            "| datafusion    | public             | temp       | LOCAL TEMPORARY |",
+            "+---------------+--------------------+------------+-----------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &result);
+    }
+
+    #[tokio::test]
     async fn information_schema_show_tables_no_information_schema() {
         let mut ctx = ExecutionContext::with_config(ExecutionConfig::new());