You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/07 12:02:18 UTC

[arrow-datafusion] branch master updated: use `Weak` ptr to break catalog list <> info schema cyclic reference (#681)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3664766  use `Weak` ptr to break catalog list <> info schema cyclic reference (#681)
3664766 is described below

commit 36647662c69b2635cce300b03e5462b39bacd2a4
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Wed Jul 7 14:02:12 2021 +0200

    use `Weak` ptr to break catalog list <> info schema cyclic reference (#681)
    
    Fixes #680.
---
 datafusion/src/catalog/information_schema.rs | 16 ++++++++++------
 datafusion/src/execution/context.rs          | 28 ++++++++++++++++++++++++++--
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs
index fd7fcb4..cd1e612 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -19,7 +19,10 @@
 //!
 //! Information Schema](https://en.wikipedia.org/wiki/Information_schema)
 
-use std::{any, sync::Arc};
+use std::{
+    any,
+    sync::{Arc, Weak},
+};
 
 use arrow::{
     array::{StringBuilder, UInt64Builder},
@@ -41,14 +44,14 @@ const COLUMNS: &str = "columns";
 /// Wraps another [`CatalogProvider`] and adds a "information_schema"
 /// schema that can introspect on tables in the catalog_list
 pub(crate) struct CatalogWithInformationSchema {
-    catalog_list: Arc<dyn CatalogList>,
+    catalog_list: Weak<dyn CatalogList>,
     /// wrapped provider
     inner: Arc<dyn CatalogProvider>,
 }
 
 impl CatalogWithInformationSchema {
     pub(crate) fn new(
-        catalog_list: Arc<dyn CatalogList>,
+        catalog_list: Weak<dyn CatalogList>,
         inner: Arc<dyn CatalogProvider>,
     ) -> Self {
         Self {
@@ -73,9 +76,10 @@ impl CatalogProvider for CatalogWithInformationSchema {
 
     fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
         if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
-            Some(Arc::new(InformationSchemaProvider {
-                catalog_list: self.catalog_list.clone(),
-            }))
+            Weak::upgrade(&self.catalog_list).map(|catalog_list| {
+                Arc::new(InformationSchemaProvider { catalog_list })
+                    as Arc<dyn SchemaProvider>
+            })
         } else {
             self.inner.schema(name)
         }
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d5a8486..6a26e04 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -144,7 +144,7 @@ impl ExecutionContext {
 
             let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
                 Arc::new(CatalogWithInformationSchema::new(
-                    catalog_list.clone(),
+                    Arc::downgrade(&catalog_list),
                     Arc::new(default_catalog),
                 ))
             } else {
@@ -346,7 +346,7 @@ impl ExecutionContext {
         let state = self.state.lock().unwrap();
         let catalog = if state.config.information_schema {
             Arc::new(CatalogWithInformationSchema::new(
-                state.catalog_list.clone(),
+                Arc::downgrade(&state.catalog_list),
                 catalog,
             ))
         } else {
@@ -924,6 +924,7 @@ mod tests {
     use arrow::datatypes::*;
     use arrow::record_batch::RecordBatch;
     use std::fs::File;
+    use std::sync::Weak;
     use std::thread::{self, JoinHandle};
     use std::{io::prelude::*, sync::Mutex};
     use tempfile::TempDir;
@@ -3364,6 +3365,29 @@ mod tests {
         assert_batches_sorted_eq!(expected, &result);
     }
 
+    #[tokio::test]
+    async fn catalogs_not_leaked() {
+        // the information schema used to introduce cyclic Arcs
+        let ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_information_schema(true),
+        );
+
+        // register a single catalog
+        let catalog = Arc::new(MemoryCatalogProvider::new());
+        let catalog_weak = Arc::downgrade(&catalog);
+        ctx.register_catalog("my_catalog", catalog);
+
+        let catalog_list_weak = {
+            let state = ctx.state.lock().unwrap();
+            Arc::downgrade(&state.catalog_list)
+        };
+
+        drop(ctx);
+
+        assert_eq!(Weak::strong_count(&catalog_list_weak), 0);
+        assert_eq!(Weak::strong_count(&catalog_weak), 0);
+    }
+
     struct MyPhysicalPlanner {}
 
     impl PhysicalPlanner for MyPhysicalPlanner {