You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/13 04:39:41 UTC

[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

yjshen commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825379032



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,
+    /// Session Id
+    pub session_id: String,

Review comment:
       nit: move `session_id` above `task_id`?

##########
File path: datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -124,10 +124,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
     async fn execute(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
-            input: self.input.execute(partition, runtime).await?,
+            input: self.input.execute(partition, context).await?,

Review comment:
       `execute` with `partition` as well as `context.task_id.partition_id` will be vague after this PR.

##########
File path: ballista/rust/executor/src/execution_loop.rs
##########
@@ -124,6 +126,20 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
     info!("Received task {}", task_id_log);
     available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
 
+    let runtime = executor.ctx.runtime_env();
+
+    //TODO get session_id from TaskDefinition
+    let session_id = "mock_session".to_owned();
+    //TODO get task_props from TaskDefinition
+    let task_props = HashMap::new();
+
+    let task_context = Arc::new(TaskContext::new(
+        task_id_log.clone(),

Review comment:
       The `_log` suffix is a little bit weird. See comments above.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,

Review comment:
       Probably we could make `task_id` a struct instead of String?  And we could replace the usage of `task_id_log` with simply `impl Display`.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {

Review comment:
       `Task` is a vague term in the scope of DataFusion, I think. In the physical optimization phase, we would `repartition` plans based on `config.target_partition` when possible. Therefore, the task represents the initial partition of `DataSource`s? 
   
   A plausible solution might require a major change on the current framework by introducing the `Stage` term in the DataFusion core. Then we could:
   - Partition input data set based on conf like`max_bytes_per_partition`.
   - processing data with physical operators serially for each input partition until we meet a "synchronization barrier" required by operators such as sort or aggregate.
   - add an exchange operator (or repartition), and continue the computation in another task from the successor stage.
   
   And by introducing `Stage`s into DataFusion core as well, we could make `task_id` required, make `task_context` the only parameter for `execute`.
   
   For the current PR, I think we should articulate what does `Task` mean for DataFusion.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I think this depends on how will `TaskProperties` be populated and lately used. If we are asking about the ability to adjust configs on a `task` basis, then a combination is needed. 
   
   Otherwise, we could do branching with `TaskProperties` like:
   ```rust
   impl TaskProperties {
       fn conf(&self, conf_key: impl Into<String>) -> String {
           "".to_owned()
       }
   
       fn typed_conf<T: Default>(&self, conf_key: impl Into<String>) -> T {
           T::default()
       }
   
       fn batch_size(&self) -> usize {
           self.typed_conf("batch_size")
       }
   }
   ``` 

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       Maybe in a follow-up PR, as the number of configurations grows, we should probably make confs in `SessionConfig` as `HashMap` as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org