You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/11 07:30:23 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request #1987: Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1

mingmwang opened a new pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987


   
   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes https://github.com/apache/arrow-datafusion/issues/1862#.
   
   This PR covers the part 1 which are the trivial changes and UT fixes.
   
   1. Rename from ExecutionContext to SessionContext and ExecutionContextState to SessionState,  Add TaskContext
   and wrap the RuntimeEnv into TaskContext and pass down TaskContext into ExecutionPlan's execute() method, fix all the trivial UTs.
   
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1066329662


   > maining `avro` tests needed to be adjusted within this PR with `RuntimeEnv`:` read_f64_alltypes_plain_avro` and `read_binary_alltypes_plain_avro`.
   
   Fixed this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] xudong963 commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
xudong963 commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825422620



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {

Review comment:
       +1. I'm a little confused, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825387905



##########
File path: datafusion/CHANGELOG.md
##########
@@ -56,7 +56,7 @@
 - Add `approx_quantile`  support [\#1538](https://github.com/apache/arrow-datafusion/issues/1538)
 - support sorting decimal data type [\#1522](https://github.com/apache/arrow-datafusion/issues/1522)
 - Keep all datafusion's packages up to date with Dependabot [\#1472](https://github.com/apache/arrow-datafusion/issues/1472)
-- ExecutionContext support init ExecutionContextState with `new(state: Arc<Mutex<ExecutionContextState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)
+- SessionContext support init SessionState with `new(state: Arc<Mutex<SessionState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)

Review comment:
       second on this since they are auto generated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825562403



##########
File path: datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -124,10 +124,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
     async fn execute(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
-            input: self.input.execute(partition, runtime).await?,
+            input: self.input.execute(partition, context).await?,

Review comment:
       Agree. That's why I'm using the string task_id in TaskContext instead of a struct.  
   Maybe we should use another uuid to uniquely present a task for Ballista Task.
   And currently in the system, we have task_id and partiton_id used alternatively. 
   
   In Ballista proto
   
   ````
   #[derive(Clone, PartialEq, ::prost::Message)]
   pub struct TaskDefinition {
       #[prost(message, optional, tag = "1")]
       pub task_id: ::core::option::Option<PartitionId>,
       #[prost(bytes = "vec", tag = "2")]
       pub plan: ::prost::alloc::vec::Vec<u8>,
       /// Output partition for shuffle writer
       #[prost(message, optional, tag = "3")]
       pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
   }
   ````
   
   In scheduler/mod.rs
   
   ````
   /// Unique identifier for the output partition of an operator.
   #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
   pub struct PartitionId {
       pub job_id: String,
       pub stage_id: usize,
       pub partition_id: usize,
   }
   ````
   
   And agree that in DataFusion the Task is a vague term.  As @yahoNanJing mentioned early, the TaskContext is actually the execute() method's context.  To avoid confusing with the original ExecutionContext, I do not use ExecutionContext but TaskContext.  Actually in DuckDb they call it ExecutionContext. I'm open to the naming, If everyone agree to use ExecutionContext, I can change it to avoid introducing a vague Task term to DataFusion.
   
   DuckDB code
   ````
   OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
   	                           OperatorState &state) const override;
   ````




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang edited a comment on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang edited a comment on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068864283


   @alamb @yjshen Conflicts resolved, please help to take a look and approve the merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825379032



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,
+    /// Session Id
+    pub session_id: String,

Review comment:
       nit: move `session_id` above `task_id`?

##########
File path: datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -124,10 +124,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
     async fn execute(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
-            input: self.input.execute(partition, runtime).await?,
+            input: self.input.execute(partition, context).await?,

Review comment:
       `execute` with `partition` as well as `context.task_id.partition_id` will be vague after this PR.

##########
File path: ballista/rust/executor/src/execution_loop.rs
##########
@@ -124,6 +126,20 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
     info!("Received task {}", task_id_log);
     available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
 
+    let runtime = executor.ctx.runtime_env();
+
+    //TODO get session_id from TaskDefinition
+    let session_id = "mock_session".to_owned();
+    //TODO get task_props from TaskDefinition
+    let task_props = HashMap::new();
+
+    let task_context = Arc::new(TaskContext::new(
+        task_id_log.clone(),

Review comment:
       The `_log` suffix is a little bit weird. See comments above.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,

Review comment:
       Probably we could make `task_id` a struct instead of String?  And we could replace the usage of `task_id_log` with simply `impl Display`.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {

Review comment:
       `Task` is a vague term in the scope of DataFusion, I think. In the physical optimization phase, we would `repartition` plans based on `config.target_partition` when possible. Therefore, the task represents the initial partition of `DataSource`s? 
   
   A plausible solution might require a major change on the current framework by introducing the `Stage` term in the DataFusion core. Then we could:
   - Partition input data set based on conf like`max_bytes_per_partition`.
   - processing data with physical operators serially for each input partition until we meet a "synchronization barrier" required by operators such as sort or aggregate.
   - add an exchange operator (or repartition), and continue the computation in another task from the successor stage.
   
   And by introducing `Stage`s into DataFusion core as well, we could make `task_id` required, make `task_context` the only parameter for `execute`.
   
   For the current PR, I think we should articulate what does `Task` mean for DataFusion.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I think this depends on how will `TaskProperties` be populated and lately used. If we are asking about the ability to adjust configs on a `task` basis, then a combination is needed. 
   
   Otherwise, we could do branching with `TaskProperties` like:
   ```rust
   impl TaskProperties {
       fn conf(&self, conf_key: impl Into<String>) -> String {
           "".to_owned()
       }
   
       fn typed_conf<T: Default>(&self, conf_key: impl Into<String>) -> T {
           T::default()
       }
   
       fn batch_size(&self) -> usize {
           self.typed_conf("batch_size")
       }
   }
   ``` 

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       Maybe in a follow-up PR, as the number of configurations grows, we should probably make confs in `SessionConfig` as `HashMap` as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825378707



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I think this depends on how will `TaskProperties` be populated and lately used. If we are asking about the ability to adjust configs on a `task` basis, then a combination is needed. 
   
   Otherwise, we could do branching with `TaskProperties` like:
   ```rust
   impl TaskProperties {
       fn conf(&self, conf_key: impl Into<String>) -> String {
           "".to_owned()
       }
   
       fn typed_conf<T>(&self, conf_key: impl Into<String>) -> T {
           T
       }
   
       fn batch_size(&self) -> usize {
           self.typed_conf("batch_size")
       }
   }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825516300



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {

Review comment:
       +1 for introducing `Stage` term in DataFusion. Ballista currently has `job_id`, `stage_id`, and `partition_id` (which perhaps could have been `task_id` instead).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825543118



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I will cover this in the following PR. For batch_size, it will be moved out from RuntimeEnv to SessionConfig.
   And SessionConfig just includes the configuration entires, no RuntimeEnv anymore.
   And in the TaskContext, there will be a method to get the current SessionConfig.
   
   ````
   impl TaskProperties {
   
   /// Return the SessionConfig associated with the Task
       pub fn session_config(&self) -> SessionConfig {
           let task_settings = &self.task_settings;
           match task_settings {
               TaskProperties::KVPairs(props) => {
                   let session_config = SessionConfig::new();
                   session_config
                       .with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap())
                       .with_target_partitions(
                           props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
                       )
                       .with_repartition_joins(
                           props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
                       )
                       .with_repartition_aggregations(
                           props
                               .get(REPARTITION_AGGREGATIONS)
                               .unwrap()
                               .parse()
                               .unwrap(),
                       )
                       .with_repartition_windows(
                           props.get(REPARTITION_WINDOWS).unwrap().parse().unwrap(),
                       )
                       .with_parquet_pruning(
                           props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
                       )
               }
               TaskProperties::SessionConfig(session_config) => session_config.clone(),
           }
       }
   }
   
   ````
   
   In this PR, to retrieve the batch size, currently, we can use below code, but this will be changed soon in the following PR.
   
   ````
    let batch_size = context.runtime.batch_size();
   
   ````




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825584839



##########
File path: datafusion/src/execution/context.rs
##########
@@ -132,32 +134,34 @@ use super::{
 /// # use datafusion::error::Result;
 /// # #[tokio::main]
 /// # async fn main() -> Result<()> {
-/// let mut ctx = ExecutionContext::new();
+/// let mut ctx = SessionContext::new();
 /// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
 /// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
 /// # Ok(())
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ExecutionContext {
+pub struct SessionContext {
+    /// Uuid for the session
+    pub session_id: String,
     /// Internal state for the context
-    pub state: Arc<Mutex<ExecutionContextState>>,
+    pub state: Arc<Mutex<SessionState>>,

Review comment:
       > it looks like there is an invariant between session_id and state.session_id, if so, it's better to keep `session_id` private so it is always in sync with the id in state. we can add a `session_id` method to make the id reader to users.
   
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #1987: Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1064895220


   Look good to introduce the *TaskContext* wrapping the *RuntimeEnv* for *ExecutionPlan* execute interface.
   
   With renaming *ExecutionContext* to *SessionContext* is also much meaningful for future multi-tenancy configurations.
   
   If it's not misleading, how about changing the *TaskContext* to *ExecutionContext* which may be better for the consistency of the *ExecutionPlan*. Then the change will be:
   - *ExecutionContext*  ----->   *SessionContext*
   - *RuntimeEnv*            ----->   *ExecutionContext* wrapping *RuntimeEnv*


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068857371


   I will merge multiple commits in this PR and rebase to master to resolve the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068821000


   Yeah, the sooner we can get this merged the better so that it doesn't pick up more conflicts and so that downstream crates can start making the required changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068679776


   @mingmwang would you please clean up the conflicts to get this merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825343004



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,
+    /// Session Id
+    pub session_id: String,
+    /// Task settings
+    pub task_settings: TaskProperties,

Review comment:
       ```suggestion
       pub properties: TaskProperties,
   ```
   
   Maybe to be consistent with the type of the struct?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1067923130


   It seems like most / all comments on this PR have been addressed. Are there any objections to clearing up the remaining conflicts, merging this one in, and refining in subsequent PRs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068864283


   @alamb @alamb Conflicts resolved, please help to take a look and approve the merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825584677



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),
+}
+
+/// Task Execution Context
+pub struct TaskContext {
+    /// Optional Task Identify
+    pub task_id: Option<String>,
+    /// Session Id
+    pub session_id: String,

Review comment:
       > nit: move `session_id` above `task_id`?
   
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on pull request #1987: Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1064849042


   @alamb @yjshen @houqp 
   
   Please help to take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on pull request #1987: Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1064853148


   @andygrove 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068943552


   @yjshen  please add a `api-change` tag for this pr or issue.
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1069057827


   Thank you @mingmwang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] xudong963 commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
xudong963 commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825421723



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {
+    ///SessionConfig
+    SessionConfig(SessionConfig),
+    /// KV pairs
+    KVPairs(HashMap<String, String>),

Review comment:
       nit: it'll be better to define a meaningful name for `KVPairs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825289854



##########
File path: benchmarks/src/bin/tpch.rs
##########
@@ -584,13 +583,13 @@ fn get_query_sql(query: usize) -> Result<String> {
     }
 }
 
-fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) -> Result<LogicalPlan> {
+fn create_logical_plan(ctx: &mut SessionContext, query: usize) -> Result<LogicalPlan> {
     let sql = get_query_sql(query)?;
     ctx.create_logical_plan(&sql)
 }
 
 async fn execute_query(
-    ctx: &mut ExecutionContext,
+    ctx: &SessionContext,

Review comment:
       it is a nice improvement to remove some of this `mut` 👍 

##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -177,6 +179,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         );
         info!("Start to run task {}", task_id_log);
 
+        let runtime = self.executor.ctx.runtime_env();
+
+        //TODO get session_id from TaskDefinition

Review comment:
       is this a TODO for a future PR?

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       It seems like `TaskProperties` might want *both* the session config as well as possibly key value pairs (rather than either / or)
   
   What about something like
   
   ```rust
   pub strut TaskProperties {
     config: SessionConfig,
     kv_pairs: Option<HashMap<String, String>>
   }
   ```
   
   ? 

##########
File path: datafusion/src/datasource/file_format/parquet.rs
##########
@@ -419,7 +423,8 @@ mod tests {
 
     #[tokio::test]
     async fn read_alltypes_plain_parquet() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let session_ctx = SessionContext::new();
+        let task_ctx = Arc::new(TaskContext::from(&session_ctx));

Review comment:
       Since this is such a common pattern (create a Arc<TaskContext>` I wonder if it would make sense to create a function for it?
   
   like
   
   ```rust
   impl SessionContext {
     /// Get a new TaskContext to run in this session
     pub fn task_ctx(&self) -> Arc<TaskContext> {
       ...
     }
   }

##########
File path: datafusion/CHANGELOG.md
##########
@@ -56,7 +56,7 @@
 - Add `approx_quantile`  support [\#1538](https://github.com/apache/arrow-datafusion/issues/1538)
 - support sorting decimal data type [\#1522](https://github.com/apache/arrow-datafusion/issues/1522)
 - Keep all datafusion's packages up to date with Dependabot [\#1472](https://github.com/apache/arrow-datafusion/issues/1472)
-- ExecutionContext support init ExecutionContextState with `new(state: Arc<Mutex<ExecutionContextState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)
+- SessionContext support init SessionState with `new(state: Arc<Mutex<SessionState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)

Review comment:
       We probably should revert changes to the CHANGELOG for past releases

##########
File path: ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
##########
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use async_trait::async_trait;

Review comment:
       BTW nice job catching all instances of `ExecutionContext
   
   ```
   -*- mode: grep; default-directory: "~/Software/arrow-datafusion/" -*-
   Grep started at Sat Mar 12 06:43:13
   
   rg -n -H --no-heading -e 'ExecutionContext' $(git rev-parse --show-toplevel || pwd)
   
   Grep finished with no matches found at Sat Mar 12 06:43:13
   
   ```

##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -138,11 +138,11 @@ impl ShuffleWriterExec {
     pub async fn execute_shuffle_write(
         &self,
         input_partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        context: Arc<TaskContext>,

Review comment:
       I think the idea of having a `TaskContext` that can have per-plan / per-task state (in addition to overall RuntimeEnv) is a significant improvement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825534941



##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -177,6 +179,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         );
         info!("Start to run task {}", task_id_log);
 
+        let runtime = self.executor.ctx.runtime_env();
+
+        //TODO get session_id from TaskDefinition

Review comment:
       Yes, will cover this in a future PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825584582



##########
File path: datafusion/src/datasource/file_format/parquet.rs
##########
@@ -419,7 +423,8 @@ mod tests {
 
     #[tokio::test]
     async fn read_alltypes_plain_parquet() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let session_ctx = SessionContext::new();
+        let task_ctx = Arc::new(TaskContext::from(&session_ctx));

Review comment:
       > Since this is such a common pattern (create a Arc` I wonder if it would make sense to create a function for it?
   > 
   > like
   > 
   > ```rust
   > impl SessionContext {
   >   /// Get a new TaskContext to run in this session
   >   pub fn task_ctx(&self) -> Arc<TaskContext> {
   >     ...
   >   }
   > }
   > ```
   
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825343741



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I think this change is important (though we could do it as a follow on PR)
   
   Otherwise I was forced to use the following code in https://github.com/influxdata/influxdb_iox/pull/4023 to get the batch size which was quite messy:
   
   ```rust
       async fn execute(
           &self,
           _partition: usize,
           context: Arc<TaskContext>,
       ) -> DataFusionResult<SendableRecordBatchStream> {
           let batch_size = {
               if let TaskProperties::SessionConfig(config) = &context.task_settings {
                   config.runtime.batch_size
               } else {
                   todo!("Need to always have properties")
               }
           };
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825378707



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
     }
 }
 
+/// Task Context Properties
+pub enum TaskProperties {

Review comment:
       I think this depends on how will `TaskProperties` be populated and lately used. If we are asking about the ability to adjust configs on a `task` basis, then a combination is needed. 
   
   Otherwise, we could do branching with `TaskProperties` like:
   ```rust
   impl TaskProperties {
       fn conf(&self, conf_key: impl Into<String>) -> String {
           "".to_owned()
       }
   
       fn typed_conf<T>(&self, conf_key: impl Into<String>) -> T {
           T::new()
       }
   
       fn batch_size(&self) -> usize {
           self.typed_conf("batch_size")
       }
   }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825396036



##########
File path: datafusion/src/execution/context.rs
##########
@@ -132,32 +134,34 @@ use super::{
 /// # use datafusion::error::Result;
 /// # #[tokio::main]
 /// # async fn main() -> Result<()> {
-/// let mut ctx = ExecutionContext::new();
+/// let mut ctx = SessionContext::new();
 /// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
 /// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
 /// # Ok(())
 /// # }
 /// ```
 #[derive(Clone)]
-pub struct ExecutionContext {
+pub struct SessionContext {
+    /// Uuid for the session
+    pub session_id: String,
     /// Internal state for the context
-    pub state: Arc<Mutex<ExecutionContextState>>,
+    pub state: Arc<Mutex<SessionState>>,

Review comment:
       it looks like there is an invariant between session_id and state.session_id, if so, it's better to keep `session_id` private so it is always in sync with the id in state. we can add a `session_id` method to make the id reader to users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#issuecomment-1068939495


   Thank you @mingmwang for being persistent and driving through this big change step by step. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987#discussion_r826671084



##########
File path: datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -124,10 +124,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
     async fn execute(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
-            input: self.input.execute(partition, runtime).await?,
+            input: self.input.execute(partition, context).await?,

Review comment:
       Since `ExecutionContext` is currently used for something quite different in DataFusion (what this PR renames to `SessionContext`), defining it to be something else may be quite confusing
   
   Perhaps something like `RunContext` or `RuntimeContext` would be less vague than `TaskContext` but also not redefine the `ExecutionContext` term?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen merged pull request #1987: Rename `ExecutionContext` to `SessionContext`, `ExecutionContextState` to `SessionState`, add `TaskContext` to support multi-tenancy configurations - Part 1

Posted by GitBox <gi...@apache.org>.
yjshen merged pull request #1987:
URL: https://github.com/apache/arrow-datafusion/pull/1987


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org