You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/05 17:16:41 UTC

[GitHub] [arrow-ballista] thinkharderdev opened a new pull request, #59: [Draft] Support for multi-scheduler deployments

thinkharderdev opened a new pull request, #59:
URL: https://github.com/apache/arrow-ballista/pull/59

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #39 
   
   Posting this draft PR for review and feedback but there are some more TODO items still in progress (mostly around cleanup and unit test coverage) but this change passes the integration tests as is so should be ready for a "test-drive" now. 
   
   TODO (before merging)
   
   - [ ] Additional unit test coverage
   - [ ] Fix `ExecuteQuery` so we don't do all the planning as part of the gRPC handler (I changed this to simplify things while testing but need to revert back to the old implementation where we do the planning in an async task in the event loop)
   - [ ] General cleanup and documentation. 
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   See #39 for a complete description but this change addresses the following issues:
   1. Allow for deployments with multiple schedulers for high-availability and scalability. 
   2. Maintain fine-grained task state in persistent storage so even single-scheduler deployments are more robust to scheduler restarts.  
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   This is quite a large refactor. See the key points below: 
   
   ## Event Loop
   
   The core event loop remains mostly unchanged. The biggest change is that stage scheduling has been mostly internalized by the `ExecutionGraph` so we only publish `JobSubmitted` and `JobCompleted` query stage events/ Likewise, the actual structure of `SchedulerServerEvents` is changed (see details below). 
   
   ## ExecutionGraph
   
   In the current implementation, the state of a job is spread across a number of different data structures so would be difficult to move to external state in the current form, especially in light of requirements around distributed locks.
   
   In this change, the `TaskScheduler` and `StageManager` have been replaced with an `ExecutionGraph` data structure which is serializable (so it can be persisted in the state backend easily) and internalized both the underlying execution plan and dependencies between query stages. This data structure internalizes the order of execution and availability of tasks and presents an interface that allows the scheduler to see a job as a stream of discrete tasks, which I believe makes the logic in the scheduler loop more straightforward.
   
   ## ExecutorReservation
   
   This data structure represents a reserved task slot on a given executor which may optionally be tied to a specific job ID. The idea here is that to avoid lock contention on the backend data store, we "hang on" to task slots through the scheduler loop instead of immediately returning them to the pool. So when a scheduler gets a task status update, it has a new task slot reservation that it can try to fill and will only return that task slot to the pool if it cannot find a task that is ready. 
   
   ## TaskManager
   
   The `TaskManager` encapsulates task/job management within the scheduler. The two most important things the `TaskManager` does are:
   * `fill_reservations` which will take a list of `ExecutorReservation`s and try to assign a task to each one (with preference given to the reservations `job_id` if present). See the docs string for details about the implementation.
   * `update_task_statuses` which will apply task status updates received from the executors and return a list of `QueryStageSchedulerEvent` along with a list of `ExecutorReservation` to be filled by the scheduler. See the docs string for details about the implementation.
   
   ## StateBackendClient
   
   I've changed this trait slightly to help with this use case:
   * Added the concept of a `Keyspace` which is just a namespace for keys but we already use namespace for something else so I didn't want to overload the term. This is mostly just encoding a desired structure for the storage layer as `Keyspace` is an enum and helping to remove boilerplate elsewhere in the codebase. 
   * Made the `lock` method scoped to a `Keyspace` and `key` so we can lock individual resources. Using a single global mutex on the state is probably not scalable. 
   * Added a `scan` method which will return all key/values in a particular keyspace (with an optional limit). 
   * Added a `scan_keys` which will do the same as `scan` but only return keys 
   * Added a `put_txn` method which allows atomically updating multiple key/values. Since we do batch updates in many places, this simplifies error handling. It is implemented in both Sled (using batch operations) and etcd (using transactions). 
   * Added a `mv` (because move is a reserved word in rust :)) that will atomically move a key from one keyspace to another. 
   
   ## The Scheduler Flow
   
   Putting this altogether, the conceptual flow in the scheduler works like so:
   
   ### Push Scheduling
   
   * Receive a set of task updates
   * Call `TaskManager::update_task_statuses` to apply updates and get back a set of `ExecutorReservation`s
   * Publish a `SchedulerServerEvent::Offer` event with these reservations
   * The event loop will receive this event and call `TaskManager::fill_reservations` to attempt to assign tasks to each reservation, giving priority to jobs which were being updated.
   * For assigned reservations, launch the corresponding tasks. 
   * For unassigned reservations, cancel (i.e. return the task slots to the pool)
   
   When a new job is submitted, we will try and reserve task slots up to the number of tasks in the job's initial stage and launch them. 
   
   ### Pull Scheduling
   * Receive a `PollWorkRequest`
   * Apply and task updated in the request using `TaskManager::update_task_statuses`
   * If the poller can accept a task, create a new reservation and call `TaskManager::fill_reservations` to try and fill it. 
   * Return a `PollWorkResponse` with the task that was assigned (if any). 
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   This change is mostly to the internal mechanics of the scheduler. The only user-facing change is to the `StateBackendClient` trait. 
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   
   Yes, the `StateBackendClient` contract is changed. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1155725304

   > Hi @thinkharderdev, just left a few comments.
   
   Hi @yahoNanJing I don't see any comments. Did you submit them?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1153735022

   > Thanks @thinkharderdev for the great work. The code is well implemented and well documented. And the task scheduling algorithm is also very sophisticated.
   > 
   > I only have two small concerns:
   > 
   > 1. For the stage-based task scheduling, suppose a scenario of 3 stages, stage3 ---> stage2--->stage1. 1K tasks for stage1, 1 task for stage2, and 1K tasks for stage3. When stage2 finishes, it's better to schedule the tasks for stage3 at all once. However, for the algorithm based on the _ExecutorReservation_, it seems this kind of all-at-once scheduling only happens when the job submitted. Maybe better to keep the previous event of _StageFinished_.
   > 2. For every state change for a job, like task status update, this _ExecutionGraph_-based implement needs to fetch and decode the _ExecutionGraph_ from the config backend. Will it be too heavy, especially when there're thousands or millions of tasks for a job? Actually the previous design of keeping the tasks in memory aims to reduce such kind of cost. Therefore, I prefer not to persist the task status info into the config backend.
   > 3. For the job scheduling policy, this implementation makes it possible for one job to be scheduled by multiple schedulers. However, I think it's not necessary. It's better to employ an active-standby policy. And make the recovery level to be stage level rather than task level if the job's active scheduler terminated. Then we can avoid the _ExecutionGraph_ decoding cost for every task update.
   
   Thanks @yahoNanJing for the review.
   
   1. This is a good point. I wanted to avoid locking the executor state as much as possible but I see that the case you mentioned is a degenerate case. 
   2. I am concerned about this as well. For simplicity I put everything into a single data structure but conceptually there is no reason we have to do it that way. We can have the `ExecutionGraph` store only the DAG structure and store the plan information in a separate data structure. Since the plan is immutable that could be cached in memory more effectively. And I think you're right in that the entire task status doesn't need to be stored at all. We only need to know whether it is pending or not. 
   3. On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on a diff in pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r889731148


##########
ballista/rust/core/proto/ballista.proto:
##########
@@ -622,6 +622,37 @@ enum JoinSide{
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Scheduling
 ///////////////////////////////////////////////////////////////////////////////////////////////////
+message TaskInputPartitions {
+  uint32 partition = 1;
+  repeated PartitionLocation partition_location = 2;
+}
+
+message GraphStageInput {
+  uint32 stage_id = 1;
+  repeated TaskInputPartitions partition_locations = 2;
+  bool complete = 3;
+}
+
+
+message ExecutionGraphStage {
+  uint64 stage_id = 1;
+  uint32 partitions = 2;
+  PhysicalHashRepartition output_partitioning = 3;
+  repeated  GraphStageInput inputs = 4;
+  bytes plan = 5;
+  repeated TaskStatus task_statuses = 6;
+  uint32 output_link = 7;
+  bool resolved = 8;
+}
+
+message ExecutionGraph {
+  string job_id = 1;
+  string session_id = 2;
+  JobStatus status = 3;
+  repeated ExecutionGraphStage stages = 4;

Review Comment:
   Got it. Yes, some comments in the structs here would be great,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1185981481

   @thinkharderdev Looks like there is a merge conflict that needs resolving


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on a diff in pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r889722012


##########
ballista/rust/core/proto/ballista.proto:
##########
@@ -622,6 +622,37 @@ enum JoinSide{
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Scheduling
 ///////////////////////////////////////////////////////////////////////////////////////////////////
+message TaskInputPartitions {
+  uint32 partition = 1;
+  repeated PartitionLocation partition_location = 2;
+}
+
+message GraphStageInput {
+  uint32 stage_id = 1;
+  repeated TaskInputPartitions partition_locations = 2;
+  bool complete = 3;
+}
+
+
+message ExecutionGraphStage {
+  uint64 stage_id = 1;
+  uint32 partitions = 2;
+  PhysicalHashRepartition output_partitioning = 3;
+  repeated  GraphStageInput inputs = 4;
+  bytes plan = 5;
+  repeated TaskStatus task_statuses = 6;
+  uint32 output_link = 7;
+  bool resolved = 8;
+}
+
+message ExecutionGraph {
+  string job_id = 1;
+  string session_id = 2;
+  JobStatus status = 3;
+  repeated ExecutionGraphStage stages = 4;

Review Comment:
   No, sorry I should have explained that. I'll add better docs to this struct, but for now each stage has an `output_link: Option<usize>` which specifies where it sends it's output. If `output_link` is `None` then the stage is final and it sends its output to the `ExecutionGraph`s `output_locations`. Likewise, each stage has a `inputs: HashMap<usize,StageOuput>` which "collects" input locations from its input stages. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1154240883

   @yahoNanJing Made a small change in the event loop. It will now eagerly attempt to schedule additional pending tasks for a job on update. I think this should address your point from 1 above. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1154024576

   Hi @thinkharderdev,
   > On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work.
   
   Maybe I did not make my point clear. My point is for one job, there's always be only one active scheduler and others for standby purpose. However, different jobs can have its own specific active scheduler. For the physical plan generation for a query, I think it's dealt with in one scheduler, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1146851519

   cc @andygrove @yahoNanJing @Ted-Jiang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1157451962

   > Hi @thinkharderdev, it's aside the code.
   
   I don't see any comments :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on a diff in pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r889893816


##########
ballista/rust/scheduler/src/planner.rs:
##########
@@ -59,15 +58,14 @@ impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
     /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
-    pub async fn plan_query_stages<'a>(
+    pub fn plan_query_stages<'a>(

Review Comment:
   Is there some reason remove the `async `🤔
   I think there are some io work in `plan_query_stages ` like save status in db



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1147728448

   Marking this as ready for review. I think the core functionality is covered by the existing unit tests and integration tests but if there is anything else where people feel more unit tests are required, please point it out :)
   
   Additionally, if there is any place where additional doc comments would be helpful please let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on a diff in pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r889978574


##########
ballista/rust/scheduler/src/planner.rs:
##########
@@ -59,15 +58,14 @@ impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
     /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
-    pub async fn plan_query_stages<'a>(
+    pub fn plan_query_stages<'a>(

Review Comment:
   I think it was doing that at tone time but the implementation now is not doing IO so I changed it back to sync. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1146874637

   I ran the integration tests (`./dev/integration-tests.sh`) and they ran without issue, so that gives me confidence that no regressions are introduced here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1185981050

   Thank you @thinkharderdev and @yahoNanJing !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1157334462

   Hi @thinkharderdev, it's aside the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1148245992

   Thanks @thinkharderdev. I'll review this PR in this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1153417260

   Thanks @thinkharderdev for the great work. The code is well implemented and well documented. And the task scheduling algorithm is also very sophisticated.
   
   I only have two small concerns:
   1. For the stage-based task scheduling, when a stage finishes, it's better to schedule the tasks in its following stages at all once. However, for the algorithm based on the *ExecutorReservation*, it seems this kind of all-at-once scheduling only happens when the job submitted. Maybe better to keep the previous event of *StageFinished*.
   2. For every state change for a job, like task status update, this *ExecutionGraph*-based implement needs to fetch and decode the *ExecutionGraph* from the config backend. Will it be too heavy, especially when there're thousands or millions of tasks for a job? Actually the previous design of keeping the tasks in memory aims to reduce such kind of cost. Therefore, I prefer not to persist the task status info into the config backend. 
   3. For the job scheduling policy, this implementation makes it possible for one job to be scheduled by multiple schedulers. However, I think it's not necessary. It's better to employ an active-standby policy. And make the recovery level to be stage level rather than task level if the job's active scheduler terminated. Then we can avoid the *ExecutionGraph* decoding cost for every task update.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1154063171

   > Hi @thinkharderdev,
   > 
   > > On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work.
   > 
   > Maybe I did not make my point clear. My point is for one job, there's always be only one active scheduler and others for standby purpose. However, different jobs can have its own specific active scheduler. For the physical plan generation for a query, I think it's dealt with in one scheduler, right?
   
   Sorry, yes I think that makes sense. If each job was "owned" by a single scheduler then we could avoid a lot of overhead and synchronization on the DAG state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1154642089

   Hi @thinkharderdev, just left a few comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove commented on a diff in pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r889718557


##########
ballista/rust/core/proto/ballista.proto:
##########
@@ -622,6 +622,37 @@ enum JoinSide{
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Scheduling
 ///////////////////////////////////////////////////////////////////////////////////////////////////
+message TaskInputPartitions {
+  uint32 partition = 1;
+  repeated PartitionLocation partition_location = 2;
+}
+
+message GraphStageInput {
+  uint32 stage_id = 1;
+  repeated TaskInputPartitions partition_locations = 2;
+  bool complete = 3;
+}
+
+
+message ExecutionGraphStage {
+  uint64 stage_id = 1;
+  uint32 partitions = 2;
+  PhysicalHashRepartition output_partitioning = 3;
+  repeated  GraphStageInput inputs = 4;
+  bytes plan = 5;
+  repeated TaskStatus task_statuses = 6;
+  uint32 output_link = 7;
+  bool resolved = 8;
+}
+
+message ExecutionGraph {
+  string job_id = 1;
+  string session_id = 2;
+  JobStatus status = 3;
+  repeated ExecutionGraphStage stages = 4;

Review Comment:
   It isn't clear to me how these stages form a dag. Are the dependencies between stages stored elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1153418376

   For the concerns mentioned above, I think they can be improved one by one in the future and this PR can be merged first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1183944016

   Thanks @thinkharderdev. Sorry for response late. I think we can merge this PR first to avoid dealing with the conflicts repeatedly. Then we can continue refining step by step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1176753857

   @andygrove @yahoNanJing What do we think of this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1147256298

   > @thinkharderdev wow! a lot change 👍, I have some questions: Does the executor and scheduler have a one-to-one relationship? if one-to-multi, how to keep executor available slot in each scheduler, if each sql-request need get state update from all-executor, is there are better way ? I think this is one problem we should consider 😊
   
   No, it is one-to-multi (or really multi-to-multi in principle). The executor available slots are stored in the state backend (either etcd or Sled). But when an executor publishes a task status update to the scheduler, then the particular scheduler which receives that request can re-asssign the executor slots without reading the backend state because the slot hasn't been returned to the pool yet. 
   
   The session issue is interesting. For right now, we just save the session properties in the backend state and whenever we need to get a `SessionContext` for a session ID, we read the properties and create a new `SessionContext`. This can probably be optimized to use a `Watch` and an in-memory session registry but I kept it simple for right now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] Ted-Jiang commented on pull request #59: [Draft] Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#issuecomment-1147134734

   @thinkharderdev wow! a lot change 👍, I have some questions:
   Does the executor and scheduler have a one-to-one relationship?
   if one-to-multi, how to keep executor available slot in each scheduler, if each sql-request need get state update from all-executor, is there are better way ? 
   I think this is one problem we should consider 😊
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] andygrove merged pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on a diff in pull request #59: Support for multi-scheduler deployments

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #59:
URL: https://github.com/apache/arrow-ballista/pull/59#discussion_r896317299


##########
ballista/rust/scheduler/src/scheduler_server/event_loop.rs:
##########
@@ -105,9 +105,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 .executor_manager
                 .cancel_reservations(free_list)
                 .await?;
+            Ok(None)
+        } else if pending_tasks > 0 {

Review Comment:
   Why we need *else* here? ❓ 



##########
ballista/rust/scheduler/src/state/executor_manager.rs:
##########
@@ -15,43 +15,354 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 
-use ballista_core::serde::protobuf::ExecutorHeartbeat;
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
-use log::{error, info, warn};
+use crate::state::backend::{Keyspace, StateBackendClient, WatchEvent};
+
+use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf;
+
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use futures::StreamExt;
+use log::{debug, info};
 use parking_lot::RwLock;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
+/// Represents a task slot that is reserved (i.e. available for scheduling but not visible to the
+/// rest of the system).
+/// When tasks finish we want to preferentially assign new tasks from the same job, so the reservation
+/// can already be assigned to a particular job ID. In that case, the scheduler will try to schedule
+/// available tasks for that job to the reserved task slot.
+#[derive(Clone, Debug)]
+pub struct ExecutorReservation {
+    pub executor_id: String,
+    pub job_id: Option<String>,
+}
+
+impl ExecutorReservation {
+    pub fn new_free(executor_id: String) -> Self {
+        Self {
+            executor_id,
+            job_id: None,
+        }
+    }
+
+    pub fn new_assigned(executor_id: String, job_id: String) -> Self {
+        Self {
+            executor_id,
+            job_id: Some(job_id),
+        }
+    }
+
+    pub fn assign(mut self, job_id: String) -> Self {
+        self.job_id = Some(job_id);
+        self
+    }
+
+    pub fn assigned(&self) -> bool {
+        self.job_id.is_some()
+    }
+}
+
 #[derive(Clone)]
 pub(crate) struct ExecutorManager {
-    executors_heartbeat: Arc<RwLock<HashMap<String, ExecutorHeartbeat>>>,
-    executors_data: Arc<RwLock<HashMap<String, ExecutorData>>>,
+    state: Arc<dyn StateBackendClient>,
+    executor_metadata: Arc<RwLock<HashMap<String, ExecutorMetadata>>>,
+    executors_heartbeat: Arc<RwLock<HashMap<String, protobuf::ExecutorHeartbeat>>>,
 }
 
 impl ExecutorManager {
-    pub(crate) fn new() -> Self {
+    pub(crate) fn new(state: Arc<dyn StateBackendClient>) -> Self {
         Self {
+            state,
+            executor_metadata: Arc::new(RwLock::new(HashMap::new())),
             executors_heartbeat: Arc::new(RwLock::new(HashMap::new())),
-            executors_data: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 
-    pub(crate) fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) {
+    /// Initialize the `ExecutorManager` state. This will fill the `executor_heartbeats` value
+    /// with existing heartbeats. Then new updates will be consumed through the `ExecutorHeartbeatListener`
+    pub async fn init(&self) -> Result<()> {
+        self.init_executor_heartbeats().await?;
+        let heartbeat_listener = ExecutorHeartbeatListener::new(
+            self.state.clone(),
+            self.executors_heartbeat.clone(),
+        );
+        heartbeat_listener.start().await
+    }
+
+    /// Reserve up to n executor task slots. Once reserved these slots will not be available
+    /// for scheduling.
+    /// This operation is atomic, so if this method return an Err, no slots have been reserved.
+    pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
+        let lock = self.state.lock(Keyspace::Slots, "global").await?;
+
+        with_lock(lock, async {
+            debug!("Attempting to reserve {} executor slots", n);
+            let start = Instant::now();
+            let mut reservations: Vec<ExecutorReservation> = vec![];
+            let mut desired: u32 = n;
+
+            let alive_executors = self.get_alive_executors_within_one_minute();
+
+            let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
+
+            for executor_id in alive_executors {
+                let value = self.state.get(Keyspace::Slots, &executor_id).await?;
+                let mut data =
+                    decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
+                let take = std::cmp::min(data.available_task_slots, desired);
+
+                for _ in 0..take {

Review Comment:
   It's a different policy from the previous round-robin one. Not sure whether it's better for the tasks be scheduled evenly to the executors?



##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -0,0 +1,974 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::DistributedPlanner;
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
+
+use ballista_core::serde::protobuf::{
+    self, CompletedJob, JobStatus, QueuedJob, TaskStatus,
+};
+use ballista_core::serde::protobuf::{job_status, FailedJob, ShuffleWritePartition};
+use ballista_core::serde::protobuf::{task_status, RunningTask};
+use ballista_core::serde::scheduler::{
+    ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
+};
+use datafusion::physical_plan::{
+    accept, ExecutionPlan, ExecutionPlanVisitor, Partitioning,
+};
+use log::debug;
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use std::sync::Arc;
+
+/// This data structure collects the partition locations for an `ExecutionStage`.
+/// Each `ExecutionStage` will hold a `StageOutput`s for each of its child stages.
+/// When all tasks for the child stage are complete, it will mark the `StageOutput`
+#[derive(Clone, Debug, Default)]
+pub struct StageOutput {
+    /// Map from partition -> partition locations
+    pub(crate) partition_locations: HashMap<usize, Vec<PartitionLocation>>,
+    /// Flag indicating whether all tasks are complete
+    pub(crate) complete: bool,
+}
+
+impl StageOutput {
+    pub fn new() -> Self {
+        Self {
+            partition_locations: HashMap::new(),
+            complete: false,
+        }
+    }
+
+    /// Add a `PartitionLocation` to the `StageOutput`
+    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
+        if let Some(parts) = self
+            .partition_locations
+            .get_mut(&partition_location.partition_id.partition_id)
+        {
+            parts.push(partition_location)
+        } else {
+            self.partition_locations.insert(
+                partition_location.partition_id.partition_id,
+                vec![partition_location],
+            );
+        }
+    }
+
+    pub fn is_complete(&self) -> bool {
+        self.complete
+    }
+}
+
+/// A stage in the ExecutionGraph.
+///
+/// This represents a set of tasks (one per each `partition`) which can
+/// be executed concurrently.
+#[derive(Clone)]
+pub struct ExecutionStage {
+    /// Stage ID
+    pub(crate) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(crate) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(crate) output_partitioning: Option<Partitioning>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are completed.
+    pub(crate) inputs: HashMap<usize, StageOutput>,
+    // `ExecutionPlan` for this stage
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the partition has not yet been scheduled
+    pub(crate) task_statuses: Vec<Option<task_status::Status>>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_link` is `None` then this the final stage in the `ExecutionGraph`
+    pub(crate) output_link: Option<usize>,
+    /// Flag indicating whether all input partitions have been resolved and the plan
+    /// has UnresovledShuffleExec operators resolved to ShuffleReadExec operators.
+    pub(crate) resolved: bool,
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let scheduled_tasks = self.task_statuses.iter().filter(|t| t.is_some()).count();
+
+        write!(
+            f,
+            "Stage[id={}, partitions={:?}, children={}, completed_tasks={}, resolved={}, scheduled_tasks={}, available_tasks={}]\nInputs{:?}\n\n{}",
+            self.stage_id,
+            self.partitions,
+            self.inputs.len(),
+            self.completed_tasks(),
+            self.resolved,
+            scheduled_tasks,
+            self.available_tasks(),
+            self.inputs,
+            plan
+        )
+    }
+}
+
+impl ExecutionStage {
+    pub fn new(
+        stage_id: usize,
+        plan: Arc<dyn ExecutionPlan>,
+        output_partitioning: Option<Partitioning>,
+        output_link: Option<usize>,
+        child_stages: Vec<usize>,
+    ) -> Self {
+        let num_tasks = plan.output_partitioning().partition_count();
+
+        let resolved = child_stages.is_empty();
+
+        let mut inputs: HashMap<usize, StageOutput> = HashMap::new();
+
+        for input_stage_id in &child_stages {
+            inputs.insert(*input_stage_id, StageOutput::new());
+        }
+
+        Self {
+            stage_id,
+            partitions: num_tasks,
+            output_partitioning,
+            inputs,
+            plan,
+            task_statuses: vec![None; num_tasks],
+            output_link,
+            resolved,
+        }
+    }
+
+    /// Returns true if all inputs are complete and we can resolve all
+    /// UnresolvedShuffleExec operators to ShuffleReadExec
+    pub fn resolvable(&self) -> bool {
+        self.inputs.iter().all(|(_, outputs)| outputs.is_complete())
+    }
+
+    /// Returns `true` if all tasks for this stage are complete
+    pub fn complete(&self) -> bool {
+        self.task_statuses
+            .iter()
+            .all(|status| matches!(status, Some(task_status::Status::Completed(_))))
+    }
+
+    /// Returns the number of tasks
+    pub fn completed_tasks(&self) -> usize {
+        self.task_statuses
+            .iter()
+            .filter(|status| matches!(status, Some(task_status::Status::Completed(_))))
+            .count()
+    }
+
+    /// Marks the input stage ID as complete.
+    pub fn complete_input(&mut self, stage_id: usize) {
+        if let Some(input) = self.inputs.get_mut(&stage_id) {
+            input.complete = true;
+        }
+    }
+
+    /// Returns true if the stage plan has all UnresolvedShuffleExec operators resolved to
+    /// ShuffleReadExec
+    pub fn resolved(&self) -> bool {
+        self.resolved
+    }
+
+    /// Returns the number of tasks in this stage which are available for scheduling.
+    /// If the stage is not yet resolved, then this will return `0`, otherwise it will
+    /// return the number of tasks where the task status is not yet set.
+    pub fn available_tasks(&self) -> usize {
+        if self.resolved {
+            self.task_statuses.iter().filter(|s| s.is_none()).count()
+        } else {
+            0
+        }
+    }
+
+    /// Resolve any UnresolvedShuffleExec operators within this stage's plan
+    pub fn resolve_shuffles(&mut self) -> Result<()> {
+        println!("Resolving shuffles\n{:?}", self);
+        if self.resolved {
+            // If this stage has no input shuffles, then it is already resolved
+            Ok(())
+        } else {
+            let input_locations = self
+                .inputs
+                .iter()
+                .map(|(stage, outputs)| (*stage, outputs.partition_locations.clone()))
+                .collect();
+            // Otherwise, rewrite the plan to replace UnresolvedShuffleExec with ShuffleReadExec
+            let new_plan = crate::planner::remove_unresolved_shuffles(
+                self.plan.clone(),
+                &input_locations,
+            )?;
+            self.plan = new_plan;
+            self.resolved = true;
+            Ok(())
+        }
+    }
+
+    /// Update the status for task partition
+    pub fn update_task_status(&mut self, partition: usize, status: task_status::Status) {
+        debug!("Updating task status for partition {}", partition);
+        self.task_statuses[partition] = Some(status);
+    }
+
+    /// Add input partitions published from an input stage.
+    pub fn add_input_partitions(
+        &mut self,
+        stage_id: usize,
+        _partition_id: usize,
+        locations: Vec<PartitionLocation>,
+    ) -> Result<()> {
+        if let Some(stage_inputs) = self.inputs.get_mut(&stage_id) {
+            for partition in locations {
+                stage_inputs.add_partition(partition);
+            }
+        } else {
+            return Err(BallistaError::Internal(format!("Error adding input partitions to stage {}, {} is not a valid child stage ID", self.stage_id, stage_id)));
+        }
+
+        Ok(())
+    }
+}
+
+/// Utility for building a set of `ExecutionStage`s from
+/// a list of `ShuffleWriterExec`.
+///
+/// This will infer the dependency structure for the stages
+/// so that we can construct a DAG from the stages.
+struct ExecutionStageBuilder {
+    /// Stage ID which is currently being visited
+    current_stage_id: usize,
+    /// Map from stage ID -> List of child stage IDs
+    stage_dependencies: HashMap<usize, Vec<usize>>,
+    /// Map from Stage ID -> output link
+    output_links: HashMap<usize, usize>,

Review Comment:
   One stage may be the input of multiple sub-stages. Therefore, the data structure of output_links should be *HashMap<usize, Vec<usize>>*



##########
ballista/rust/scheduler/src/scheduler_server/event_loop.rs:
##########
@@ -60,13 +60,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         &self,
         reservations: Vec<ExecutorReservation>,
     ) -> Result<Option<SchedulerServerEvent>> {
-        let free_list = match self
+        let (free_list, pending_tasks) = match self
             .state
             .task_manager
             .fill_reservations(&reservations)
             .await
         {
-            Ok((assignments, mut unassigned_reservations)) => {
+            Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
                 for (executor_id, task) in assignments.into_iter() {

Review Comment:
   Is it better to classify tasks for the same executor and then can be launched only once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org