You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/10 11:19:12 UTC

[arrow-datafusion] branch master updated: Create ListingTableConfig which includes file format and schema inference (#1715)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f6969  Create ListingTableConfig which includes file format and schema inference (#1715)
e5f6969 is described below

commit e5f69695da103ceab60f8410d129e4e7a2d5bca7
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Thu Feb 10 06:19:08 2022 -0500

    Create ListingTableConfig which includes file format and schema inference (#1715)
    
    * Starting work on ListingTableConfig
    
    * v1 of config
    
    * Refactor for cleaner api
    
    * Update references in rest of code base
    
    * Docs
    
    * Clippy
    
    * Remove async from ListingTable::try_new
    
    * Ballista
    
    * Benchmark fix
    
    * benchs
    
    * Clippy
    
    * Clippy
    
    * Fix empty table bug
    
    * Fix table path
    
    * Get file from directory
    
    * Fix read empty table test
    
    * Remove unnecessary async
    
    * Starting work on ListingTableConfig
    
    * v1 of config
    
    * Refactor for cleaner api
    
    * Update references in rest of code base
    
    * Docs
    
    * Clippy
    
    * Remove async from ListingTable::try_new
    
    * Ballista
    
    * Benchmark fix
    
    * benchs
    
    * Clippy
    
    * Clippy
    
    * Fix empty table bug
    
    * Fix table path
    
    * Get file from directory
    
    * Fix read empty table test
    
    * Remove unnecessary async
    
    * Rebase
    
    * Improve log
    
    * Rebase fix
---
 .../rust/core/src/serde/logical_plan/from_proto.rs |   2 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  13 +-
 benchmarks/src/bin/tpch.rs                         |  24 +--
 datafusion/benches/sort_limit_query_sql.rs         |  18 +-
 datafusion/src/datasource/listing/mod.rs           |   2 +-
 datafusion/src/datasource/listing/table.rs         | 200 +++++++++++++++++----
 datafusion/src/execution/context.rs                |   7 +-
 datafusion/src/logical_plan/builder.rs             |  21 ++-
 datafusion/tests/path_partition.rs                 |  15 +-
 9 files changed, 222 insertions(+), 80 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 5951376..fb900e1 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
 use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
 use datafusion::datasource::object_store::local::LocalFileSystem;
 use datafusion::datasource::object_store::{FileMeta, SizedFile};
 use datafusion::logical_plan::window_frames::{
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 31c3308..3166a48 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -27,7 +27,7 @@ use datafusion::datasource::file_format::avro::AvroFormat;
 use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
 use datafusion::datasource::object_store::local::LocalFileSystem;
 use datafusion::logical_plan::plan::Extension;
 use datafusion::logical_plan::plan::{
@@ -241,12 +241,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                     scan.path.as_str()
                 );
 
-                let provider = ListingTable::new(
-                    object_store,
-                    scan.path.clone(),
-                    Arc::new(schema),
-                    options,
-                );
+                let config = ListingTableConfig::new(object_store, scan.path.as_str())
+                    .with_listing_options(options)
+                    .with_schema(Arc::new(schema));
+
+                let provider = ListingTable::try_new(config)?;
 
                 LogicalPlanBuilder::scan_with_filters(
                     &scan.table_name,
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 3880ca6..93c44ef 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -51,7 +51,7 @@ use datafusion::{
 use datafusion::{
     arrow::util::pretty,
     datasource::{
-        listing::{ListingOptions, ListingTable},
+        listing::{ListingOptions, ListingTable, ListingTableConfig},
         object_store::local::LocalFileSystem,
     },
 };
@@ -724,12 +724,11 @@ fn get_table(
         table_partition_cols: vec![],
     };
 
-    Ok(Arc::new(ListingTable::new(
-        Arc::new(LocalFileSystem {}),
-        path,
-        schema,
-        options,
-    )))
+    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+        .with_listing_options(options)
+        .with_schema(schema);
+
+    Ok(Arc::new(ListingTable::try_new(config)?))
 }
 
 fn get_schema(table: &str) -> Schema {
@@ -1389,12 +1388,13 @@ mod tests {
                     .has_header(false)
                     .file_extension(".tbl");
                 let listing_options = options.to_listing_options(1);
-                let provider = ListingTable::new(
+                let config = ListingTableConfig::new(
                     Arc::new(LocalFileSystem {}),
-                    format!("{}/{}.tbl", tpch_data_path, table),
-                    Arc::new(schema),
-                    listing_options,
-                );
+                    tpch_data_path.clone(),
+                )
+                .with_listing_options(listing_options)
+                .with_schema(Arc::new(schema));
+                let provider = ListingTable::try_new(config)?;
                 ctx.register_table(table, Arc::new(provider))?;
             }
 
diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs
index 3828014..2434341 100644
--- a/datafusion/benches/sort_limit_query_sql.rs
+++ b/datafusion/benches/sort_limit_query_sql.rs
@@ -19,7 +19,7 @@
 extern crate criterion;
 use criterion::Criterion;
 use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
 use datafusion::datasource::object_store::local::LocalFileSystem;
 
 use parking_lot::Mutex;
@@ -63,14 +63,16 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
 
     let testdata = datafusion::test_util::arrow_test_data();
 
+    let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+
     // create CSV data source
     let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
-    let csv = ListingTable::new(
-        Arc::new(LocalFileSystem {}),
-        format!("{}/csv/aggregate_test_100.csv", testdata),
-        schema,
-        listing_options,
-    );
+
+    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
+        .with_listing_options(listing_options)
+        .with_schema(schema);
+
+    let csv = async { ListingTable::try_new(config).unwrap() };
 
     let rt = Runtime::new().unwrap();
 
@@ -85,7 +87,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
         ctx.state.lock().config.target_partitions = 1;
         let runtime = ctx.state.lock().runtime_env.clone();
 
-        let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
+        let mem_table = MemTable::load(Arc::new(csv.await), Some(partitions), runtime)
             .await
             .unwrap();
         ctx.register_table("aggregate_test_100", Arc::new(mem_table))
diff --git a/datafusion/src/datasource/listing/mod.rs b/datafusion/src/datasource/listing/mod.rs
index c8b9241..2f1d034 100644
--- a/datafusion/src/datasource/listing/mod.rs
+++ b/datafusion/src/datasource/listing/mod.rs
@@ -21,4 +21,4 @@
 mod helpers;
 mod table;
 
-pub use table::{ListingOptions, ListingTable};
+pub use table::{ListingOptions, ListingTable, ListingTableConfig};
diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs
index 1501b8b..3fbd6c1 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -24,7 +24,11 @@ use async_trait::async_trait;
 use futures::StreamExt;
 
 use crate::{
-    error::Result,
+    datasource::file_format::avro::AvroFormat,
+    datasource::file_format::csv::CsvFormat,
+    datasource::file_format::json::JsonFormat,
+    datasource::file_format::parquet::ParquetFormat,
+    error::{DataFusionError, Result},
     logical_plan::Expr,
     physical_plan::{
         empty::EmptyExec,
@@ -40,7 +44,124 @@ use crate::datasource::{
 
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
 
+/// Configuration for creating a 'ListingTable'  
+pub struct ListingTableConfig {
+    /// `ObjectStore` that contains the files for the `ListingTable`.
+    pub object_store: Arc<dyn ObjectStore>,
+    /// Path on the `ObjectStore` for creating `ListingTable`.
+    pub table_path: String,
+    /// Optional `SchemaRef` for the to be created `ListingTable`.
+    pub file_schema: Option<SchemaRef>,
+    /// Optional `ListingOptions` for the to be created `ListingTable`.
+    pub options: Option<ListingOptions>,
+}
+
+impl ListingTableConfig {
+    /// Creates new `ListingTableConfig`.  The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`.
+    pub fn new(
+        object_store: Arc<dyn ObjectStore>,
+        table_path: impl Into<String>,
+    ) -> Self {
+        Self {
+            object_store,
+            table_path: table_path.into(),
+            file_schema: None,
+            options: None,
+        }
+    }
+    /// Add `schema` to `ListingTableConfig`
+    pub fn with_schema(self, schema: SchemaRef) -> Self {
+        Self {
+            object_store: self.object_store,
+            table_path: self.table_path,
+            file_schema: Some(schema),
+            options: self.options,
+        }
+    }
+
+    /// Add `listing_options` to `ListingTableConfig`
+    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
+        Self {
+            object_store: self.object_store,
+            table_path: self.table_path,
+            file_schema: self.file_schema,
+            options: Some(listing_options),
+        }
+    }
+
+    fn infer_format(suffix: &str) -> Result<Arc<dyn FileFormat>> {
+        match suffix {
+            "avro" => Ok(Arc::new(AvroFormat::default())),
+            "csv" => Ok(Arc::new(CsvFormat::default())),
+            "json" => Ok(Arc::new(JsonFormat::default())),
+            "parquet" => Ok(Arc::new(ParquetFormat::default())),
+            _ => Err(DataFusionError::Internal(format!(
+                "Unable to infer file type from suffix {}",
+                suffix
+            ))),
+        }
+    }
+
+    /// Infer `ListingOptions` based on `table_path` suffix.
+    pub async fn infer_options(self) -> Result<Self> {
+        let mut files = self.object_store.list_file(&self.table_path).await?;
+        let file = files
+            .next()
+            .await
+            .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
+
+        let tokens: Vec<&str> = file.path().split('.').collect();
+        let file_type = tokens.last().ok_or_else(|| {
+            DataFusionError::Internal("Unable to infer file suffix".into())
+        })?;
+
+        let format = ListingTableConfig::infer_format(*file_type)?;
+
+        let listing_options = ListingOptions {
+            format,
+            collect_stat: true,
+            file_extension: file_type.to_string(),
+            target_partitions: num_cpus::get(),
+            table_partition_cols: vec![],
+        };
+
+        Ok(Self {
+            object_store: self.object_store,
+            table_path: self.table_path,
+            file_schema: self.file_schema,
+            options: Some(listing_options),
+        })
+    }
+
+    /// Infer `SchemaRef` based on `table_path` suffix.  Requires `self.options` to be set prior to using.
+    pub async fn infer_schema(self) -> Result<Self> {
+        match self.options {
+            Some(options) => {
+                let schema = options
+                    .infer_schema(self.object_store.clone(), self.table_path.as_str())
+                    .await?;
+
+                Ok(Self {
+                    object_store: self.object_store,
+                    table_path: self.table_path,
+                    file_schema: Some(schema),
+                    options: Some(options),
+                })
+            }
+            None => Err(DataFusionError::Internal(
+                "No `ListingOptions` set for inferring schema".into(),
+            )),
+        }
+    }
+
+    /// Convenience wrapper for calling `infer_options` and `infer_schema`
+    pub async fn infer(self) -> Result<Self> {
+        self.infer_options().await?.infer_schema().await
+    }
+}
+
 /// Options for creating a `ListingTable`
+#[derive(Clone)]
 pub struct ListingOptions {
     /// A suffix on which files should be filtered (leave empty to
     /// keep all files on the path)
@@ -115,15 +236,21 @@ pub struct ListingTable {
 
 impl ListingTable {
     /// Create new table that lists the FS to get the files to scan.
-    /// The provided `schema` must be resolved before creating the table
+    /// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
+    /// `ListingOptions` and `SchemaRef` are optional.  If they are not
+    /// provided the file type is inferred based on the file suffix.
+    /// If the schema is provided then it must be resolved before creating the table
     /// and should contain the fields of the file without the table
     /// partitioning columns.
-    pub fn new(
-        object_store: Arc<dyn ObjectStore>,
-        table_path: String,
-        file_schema: SchemaRef,
-        options: ListingOptions,
-    ) -> Self {
+    pub fn try_new(config: ListingTableConfig) -> Result<Self> {
+        let file_schema = config
+            .file_schema
+            .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
+
+        let options = config.options.ok_or_else(|| {
+            DataFusionError::Internal("No ListingOptions provided".into())
+        })?;
+
         // Add the partition columns to the file schema
         let mut table_fields = file_schema.fields().clone();
         for part in &options.table_partition_cols {
@@ -134,13 +261,15 @@ impl ListingTable {
             ));
         }
 
-        Self {
-            object_store,
-            table_path,
+        let table = Self {
+            object_store: config.object_store.clone(),
+            table_path: config.table_path.clone(),
             file_schema,
             table_schema: Arc::new(Schema::new(table_fields)),
             options,
-        }
+        };
+
+        Ok(table)
     }
 
     /// Get object store ref
@@ -264,10 +393,7 @@ impl ListingTable {
 
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::DataType;
-
     use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
-    use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
     use crate::{
         datasource::{
             file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -276,6 +402,7 @@ mod tests {
         logical_plan::{col, lit},
         test::{columns, object_store::TestObjectStore},
     };
+    use arrow::datatypes::DataType;
 
     use super::*;
 
@@ -306,8 +433,10 @@ mod tests {
         let schema = opt
             .infer_schema(Arc::new(LocalFileSystem {}), &filename)
             .await?;
-        let table =
-            ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, opt);
+        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
+            .with_listing_options(opt)
+            .with_schema(schema);
+        let table = ListingTable::try_new(config)?;
         let exec = table.scan(&None, &[], None).await?;
         assert_eq!(exec.statistics().num_rows, Some(8));
         assert_eq!(exec.statistics().total_byte_size, Some(671));
@@ -317,7 +446,8 @@ mod tests {
 
     #[tokio::test]
     async fn read_empty_table() -> Result<()> {
-        let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]);
+        let path = String::from("table/p1=v1/file.avro");
+        let store = TestObjectStore::new_arc(&[(&path, 100)]);
 
         let opt = ListingOptions {
             file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
@@ -327,10 +457,13 @@ mod tests {
             collect_stat: true,
         };
 
-        let file_schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+        let file_schema =
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
+        let config = ListingTableConfig::new(store, "table/")
+            .with_listing_options(opt)
+            .with_schema(file_schema);
+        let table = ListingTable::try_new(config)?;
 
-        let table =
-            ListingTable::new(store, "table/".to_owned(), Arc::new(file_schema), opt);
         assert_eq!(
             columns(&table.schema()),
             vec!["a".to_owned(), "p1".to_owned()]
@@ -420,20 +553,10 @@ mod tests {
     async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, name);
-        let opt = ListingOptions {
-            file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
-            format: Arc::new(ParquetFormat::default()),
-            table_partition_cols: vec![],
-            target_partitions: 2,
-            collect_stat: true,
-        };
-        // here we resolve the schema locally
-        let schema = opt
-            .infer_schema(Arc::new(LocalFileSystem {}), &filename)
-            .await
-            .expect("Infer schema");
-        let table =
-            ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, opt);
+        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename)
+            .infer()
+            .await?;
+        let table = ListingTable::try_new(config)?;
         Ok(Arc::new(table))
     }
 
@@ -460,8 +583,11 @@ mod tests {
 
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
 
-        let table =
-            ListingTable::new(mock_store, table_prefix.to_owned(), Arc::new(schema), opt);
+        let config = ListingTableConfig::new(mock_store, table_prefix.to_owned())
+            .with_listing_options(opt)
+            .with_schema(Arc::new(schema));
+
+        let table = ListingTable::try_new(config)?;
 
         let (file_list, _) = table.list_files_for_scan(&[], None).await?;
 
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d4f1ca4..393e4af 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -56,6 +56,7 @@ use crate::catalog::{
     schema::{MemorySchemaProvider, SchemaProvider},
     ResolvedTableReference, TableReference,
 };
+use crate::datasource::listing::ListingTableConfig;
 use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
@@ -455,8 +456,10 @@ impl ExecutionContext {
             }
             Some(s) => s,
         };
-        let table =
-            ListingTable::new(object_store, path.to_owned(), resolved_schema, options);
+        let config = ListingTableConfig::new(object_store, path)
+            .with_listing_options(options)
+            .with_schema(resolved_schema);
+        let table = ListingTable::try_new(config)?;
         self.register_table(name, Arc::new(table))?;
         Ok(())
     }
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index a722238..0144b75 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -20,7 +20,7 @@
 use crate::datasource::{
     empty::EmptyTable,
     file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
-    listing::{ListingOptions, ListingTable},
+    listing::{ListingOptions, ListingTable, ListingTableConfig},
     object_store::ObjectStore,
     MemTable, TableProvider,
 };
@@ -243,8 +243,10 @@ impl LogicalPlanBuilder {
                     .await?
             }
         };
-        let provider =
-            ListingTable::new(object_store, path, resolved_schema, listing_options);
+        let config = ListingTableConfig::new(object_store, path)
+            .with_listing_options(listing_options)
+            .with_schema(resolved_schema);
+        let provider = ListingTable::try_new(config)?;
 
         Self::scan(table_name, Arc::new(provider), projection)
     }
@@ -293,8 +295,11 @@ impl LogicalPlanBuilder {
             .infer_schema(Arc::clone(&object_store), &path)
             .await?;
 
-        let provider =
-            ListingTable::new(object_store, path, resolved_schema, listing_options);
+        let config = ListingTableConfig::new(object_store, path)
+            .with_listing_options(listing_options)
+            .with_schema(resolved_schema);
+
+        let provider = ListingTable::try_new(config)?;
         Self::scan(table_name, Arc::new(provider), projection)
     }
 
@@ -339,8 +344,10 @@ impl LogicalPlanBuilder {
                     .await?
             }
         };
-        let provider =
-            ListingTable::new(object_store, path, resolved_schema, listing_options);
+        let config = ListingTableConfig::new(object_store, path)
+            .with_listing_options(listing_options)
+            .with_schema(resolved_schema);
+        let provider = ListingTable::try_new(config)?;
 
         Self::scan(table_name, Arc::new(provider), projection)
     }
diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs
index e68ef32..178e318 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -24,7 +24,7 @@ use datafusion::{
     assert_batches_sorted_eq,
     datasource::{
         file_format::{csv::CsvFormat, parquet::ParquetFormat},
-        listing::{ListingOptions, ListingTable},
+        listing::{ListingOptions, ListingTable, ListingTableConfig},
         object_store::{
             local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
             ObjectReader, ObjectStore, SizedFile,
@@ -285,8 +285,10 @@ fn register_partitioned_aggregate_csv(
     let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
     options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();
 
-    let table =
-        ListingTable::new(object_store, table_path.to_owned(), file_schema, options);
+    let config = ListingTableConfig::new(object_store, table_path)
+        .with_listing_options(options)
+        .with_schema(file_schema);
+    let table = ListingTable::try_new(config).unwrap();
 
     ctx.register_table("t", Arc::new(table))
         .expect("registering listing table failed");
@@ -313,8 +315,11 @@ async fn register_partitioned_alltypes_parquet(
         .await
         .expect("Parquet schema inference failed");
 
-    let table =
-        ListingTable::new(object_store, table_path.to_owned(), file_schema, options);
+    let config = ListingTableConfig::new(object_store, table_path)
+        .with_listing_options(options)
+        .with_schema(file_schema);
+
+    let table = ListingTable::try_new(config).unwrap();
 
     ctx.register_table("t", Arc::new(table))
         .expect("registering listing table failed");