You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/07 12:02:18 UTC
[arrow-datafusion] branch master updated: use `Weak` ptr to break
catalog list <> info schema cyclic reference (#681)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3664766 use `Weak` ptr to break catalog list <> info schema cyclic reference (#681)
3664766 is described below
commit 36647662c69b2635cce300b03e5462b39bacd2a4
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Wed Jul 7 14:02:12 2021 +0200
use `Weak` ptr to break catalog list <> info schema cyclic reference (#681)
Fixes #680.
---
datafusion/src/catalog/information_schema.rs | 16 ++++++++++------
datafusion/src/execution/context.rs | 28 ++++++++++++++++++++++++++--
2 files changed, 36 insertions(+), 8 deletions(-)
diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs
index fd7fcb4..cd1e612 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -19,7 +19,10 @@
//!
//! Information Schema](https://en.wikipedia.org/wiki/Information_schema)
-use std::{any, sync::Arc};
+use std::{
+ any,
+ sync::{Arc, Weak},
+};
use arrow::{
array::{StringBuilder, UInt64Builder},
@@ -41,14 +44,14 @@ const COLUMNS: &str = "columns";
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
- catalog_list: Arc<dyn CatalogList>,
+ catalog_list: Weak<dyn CatalogList>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}
impl CatalogWithInformationSchema {
pub(crate) fn new(
- catalog_list: Arc<dyn CatalogList>,
+ catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
@@ -73,9 +76,10 @@ impl CatalogProvider for CatalogWithInformationSchema {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
- Some(Arc::new(InformationSchemaProvider {
- catalog_list: self.catalog_list.clone(),
- }))
+ Weak::upgrade(&self.catalog_list).map(|catalog_list| {
+ Arc::new(InformationSchemaProvider { catalog_list })
+ as Arc<dyn SchemaProvider>
+ })
} else {
self.inner.schema(name)
}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d5a8486..6a26e04 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -144,7 +144,7 @@ impl ExecutionContext {
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
- catalog_list.clone(),
+ Arc::downgrade(&catalog_list),
Arc::new(default_catalog),
))
} else {
@@ -346,7 +346,7 @@ impl ExecutionContext {
let state = self.state.lock().unwrap();
let catalog = if state.config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
- state.catalog_list.clone(),
+ Arc::downgrade(&state.catalog_list),
catalog,
))
} else {
@@ -924,6 +924,7 @@ mod tests {
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use std::fs::File;
+ use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
@@ -3364,6 +3365,29 @@ mod tests {
assert_batches_sorted_eq!(expected, &result);
}
+ #[tokio::test]
+ async fn catalogs_not_leaked() {
+ // the information schema used to introduce cyclic Arcs
+ let ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_information_schema(true),
+ );
+
+ // register a single catalog
+ let catalog = Arc::new(MemoryCatalogProvider::new());
+ let catalog_weak = Arc::downgrade(&catalog);
+ ctx.register_catalog("my_catalog", catalog);
+
+ let catalog_list_weak = {
+ let state = ctx.state.lock().unwrap();
+ Arc::downgrade(&state.catalog_list)
+ };
+
+ drop(ctx);
+
+ assert_eq!(Weak::strong_count(&catalog_list_weak), 0);
+ assert_eq!(Weak::strong_count(&catalog_weak), 0);
+ }
+
struct MyPhysicalPlanner {}
impl PhysicalPlanner for MyPhysicalPlanner {