You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/06 12:47:35 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1924: Refactor ExecutionContext and related conf to support multi-tenancy configurations

alamb commented on a change in pull request #1924:
URL: https://github.com/apache/arrow-datafusion/pull/1924#discussion_r820227297



##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -25,47 +25,110 @@ use crate::{
         memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
     },
 };
-
+use std::fmt;
 use std::fmt::{Debug, Formatter};
+
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
+use crate::execution::context::{
+    SessionContextRegistry, TaskContext, TaskContextRegistry, BATCH_SIZE,
+    PARQUET_PRUNING, REPARTITION_AGGREGATIONS, REPARTITION_JOINS, REPARTITION_WINDOWS,
+    TARGET_PARTITIONS,
+};
+use crate::prelude::{SessionConfig, SessionContext};
+use datafusion_common::DataFusionError;
+use once_cell::sync::OnceCell;
+use parking_lot::Mutex;
+use std::path::PathBuf;
 use std::sync::Arc;
 
-#[derive(Clone)]
-/// Execution runtime environment. This structure is passed to the
-/// physical plans when they are run.
+/// Global singleton RuntimeEnv
+pub static RUNTIME_ENV: OnceCell<Arc<RuntimeEnv>> = OnceCell::new();

Review comment:
       I think a single static runtime environment makes sense for Ballista but not for DataFusion (which gets used in a variety of usecases that a single runtime might not be applicable for)
   

##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -25,47 +25,110 @@ use crate::{
         memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
     },
 };
-
+use std::fmt;
 use std::fmt::{Debug, Formatter};
+
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
+use crate::execution::context::{
+    SessionContextRegistry, TaskContext, TaskContextRegistry, BATCH_SIZE,
+    PARQUET_PRUNING, REPARTITION_AGGREGATIONS, REPARTITION_JOINS, REPARTITION_WINDOWS,
+    TARGET_PARTITIONS,
+};
+use crate::prelude::{SessionConfig, SessionContext};
+use datafusion_common::DataFusionError;
+use once_cell::sync::OnceCell;
+use parking_lot::Mutex;
+use std::path::PathBuf;
 use std::sync::Arc;
 
-#[derive(Clone)]
-/// Execution runtime environment. This structure is passed to the
-/// physical plans when they are run.
+/// Global singleton RuntimeEnv
+pub static RUNTIME_ENV: OnceCell<Arc<RuntimeEnv>> = OnceCell::new();
+
+/// Execution runtime environment. This structure is a singleton for each Scheduler/Executor instance.
 pub struct RuntimeEnv {
-    /// Default batch size while creating new batches
-    pub batch_size: usize,
+    /// Executor Id
+    pub executor_id: Option<String>,
+    /// Local Env
+    pub is_local: bool,
     /// Runtime memory management
     pub memory_manager: Arc<MemoryManager>,
     /// Manage temporary files during query execution
     pub disk_manager: Arc<DiskManager>,
+    /// Object Store that are registered within the Scheduler's or Executors' Runtime
+    pub object_store_registry: Arc<ObjectStoreRegistry>,
+    /// DataFusion task contexts that are registered within the Executors' Runtime
+    pub task_context_registry: Option<Arc<TaskContextRegistry>>,
+    /// DataFusion session contexts that are registered within the Scheduler's Runtime
+    pub session_context_registry: Option<Arc<SessionContextRegistry>>,
 }
 
-impl Debug for RuntimeEnv {
-    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
-        write!(f, "RuntimeEnv")
+impl RuntimeEnv {
+    /// Create an executor env based on configuration
+    pub fn new_executor_env(config: RuntimeConfig, executor_id: String) -> Result<Self> {
+        let RuntimeConfig {
+            memory_manager,
+            disk_manager,
+        } = config;
+        Ok(Self {
+            executor_id: Some(executor_id),
+            is_local: false,
+            memory_manager: MemoryManager::new(memory_manager),
+            disk_manager: DiskManager::try_new(disk_manager)?,
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+            task_context_registry: Some(Arc::new(TaskContextRegistry::new())),
+            session_context_registry: None,
+        })
     }
-}
 
-impl RuntimeEnv {
-    /// Create env based on configuration
-    pub fn new(config: RuntimeConfig) -> Result<Self> {
+    /// Create a scheduler env based on configuration
+    pub fn new_scheduler_env(config: RuntimeConfig) -> Result<Self> {
         let RuntimeConfig {
-            batch_size,
             memory_manager,
             disk_manager,
         } = config;
+        Ok(Self {
+            executor_id: None,
+            is_local: false,
+            memory_manager: MemoryManager::new(memory_manager),
+            disk_manager: DiskManager::try_new(disk_manager)?,
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+            task_context_registry: None,
+            session_context_registry: Some(Arc::new(SessionContextRegistry::new())),
+        })
+    }
 
+    /// Create a local env based on configuration
+    pub fn new_local_env(config: RuntimeConfig) -> Result<Self> {
+        let RuntimeConfig {
+            memory_manager,
+            disk_manager,
+        } = config;
         Ok(Self {
-            batch_size,
+            executor_id: None,
+            is_local: true,
             memory_manager: MemoryManager::new(memory_manager),
             disk_manager: DiskManager::try_new(disk_manager)?,
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+            task_context_registry: None,
+            session_context_registry: Some(Arc::new(SessionContextRegistry::new())),
         })
     }
 
-    /// Get execution batch size based on config
-    pub fn batch_size(&self) -> usize {
-        self.batch_size
+    /// Return the global singleton RuntimeEnv
+    pub fn global() -> &'static Arc<RuntimeEnv> {

Review comment:
       I am concerned that if we have this function, it will become too easy for DataFusion code to accidentally use the global `RuntimeEnv` rather than the one that is passed through
   
   I would prefer that the global runtime is put in ballista (or a separate module in DataFusion)

##########
File path: benchmarks/src/bin/tpch.rs
##########
@@ -1282,10 +1289,10 @@ mod tests {
     async fn run_query(n: usize) -> Result<()> {
         // Tests running query with empty tables, to see whether they run succesfully.
 
-        let config = ExecutionConfig::new()
+        let config = SessionConfig::new()
             .with_target_partitions(1)
             .with_batch_size(10);
-        let mut ctx = ExecutionContext::with_config(config);
+        let ctx = SessionContext::with_config(config, RuntimeEnv::global());

Review comment:
       I like that the RuntimeEnv is explicitly passed here

##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -25,47 +25,110 @@ use crate::{
         memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
     },
 };
-
+use std::fmt;
 use std::fmt::{Debug, Formatter};
+
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
+use crate::execution::context::{
+    SessionContextRegistry, TaskContext, TaskContextRegistry, BATCH_SIZE,
+    PARQUET_PRUNING, REPARTITION_AGGREGATIONS, REPARTITION_JOINS, REPARTITION_WINDOWS,
+    TARGET_PARTITIONS,
+};
+use crate::prelude::{SessionConfig, SessionContext};
+use datafusion_common::DataFusionError;
+use once_cell::sync::OnceCell;
+use parking_lot::Mutex;
+use std::path::PathBuf;
 use std::sync::Arc;
 
-#[derive(Clone)]
-/// Execution runtime environment. This structure is passed to the
-/// physical plans when they are run.
+/// Global singleton RuntimeEnv
+pub static RUNTIME_ENV: OnceCell<Arc<RuntimeEnv>> = OnceCell::new();
+
+/// Execution runtime environment. This structure is a singleton for each Scheduler/Executor instance.
 pub struct RuntimeEnv {
-    /// Default batch size while creating new batches
-    pub batch_size: usize,
+    /// Executor Id
+    pub executor_id: Option<String>,
+    /// Local Env
+    pub is_local: bool,
     /// Runtime memory management
     pub memory_manager: Arc<MemoryManager>,
     /// Manage temporary files during query execution
     pub disk_manager: Arc<DiskManager>,
+    /// Object Store that are registered within the Scheduler's or Executors' Runtime
+    pub object_store_registry: Arc<ObjectStoreRegistry>,
+    /// DataFusion task contexts that are registered within the Executors' Runtime
+    pub task_context_registry: Option<Arc<TaskContextRegistry>>,
+    /// DataFusion session contexts that are registered within the Scheduler's Runtime
+    pub session_context_registry: Option<Arc<SessionContextRegistry>>,
 }
 
-impl Debug for RuntimeEnv {
-    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
-        write!(f, "RuntimeEnv")
+impl RuntimeEnv {
+    /// Create an executor env based on configuration
+    pub fn new_executor_env(config: RuntimeConfig, executor_id: String) -> Result<Self> {
+        let RuntimeConfig {
+            memory_manager,
+            disk_manager,
+        } = config;
+        Ok(Self {
+            executor_id: Some(executor_id),
+            is_local: false,
+            memory_manager: MemoryManager::new(memory_manager),
+            disk_manager: DiskManager::try_new(disk_manager)?,
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+            task_context_registry: Some(Arc::new(TaskContextRegistry::new())),
+            session_context_registry: None,
+        })
     }
-}
 
-impl RuntimeEnv {
-    /// Create env based on configuration
-    pub fn new(config: RuntimeConfig) -> Result<Self> {
+    /// Create a scheduler env based on configuration
+    pub fn new_scheduler_env(config: RuntimeConfig) -> Result<Self> {
         let RuntimeConfig {
-            batch_size,
             memory_manager,
             disk_manager,
         } = config;
+        Ok(Self {
+            executor_id: None,
+            is_local: false,
+            memory_manager: MemoryManager::new(memory_manager),
+            disk_manager: DiskManager::try_new(disk_manager)?,
+            object_store_registry: Arc::new(ObjectStoreRegistry::new()),

Review comment:
       I like the idea of adding a `object_store_registry`, `task_context_registry` and `session_conctext_registry` to the `RuntimeEnvironment` -- I think having a way to track those items is a good one. 👍 

##########
File path: benchmarks/src/bin/tpch.rs
##########
@@ -1307,7 +1314,7 @@ mod tests {
             // load expected answers from tpch-dbgen
             // read csv as all strings, trim and cast to expected type as the csv string
             // to value parser does not handle data with leading/trailing spaces
-            let mut ctx = ExecutionContext::new();
+            let ctx = SessionContext::new();

Review comment:
       I wonder what you think about calling this function `SessionContext::create_default()` or something to give a hint that it will be connected to the Global runtime environment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org