You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "smallzhongfeng (via GitHub)" <gi...@apache.org> on 2023/06/05 09:21:54 UTC

[GitHub] [arrow-ballista] smallzhongfeng opened a new issue, #803: [Problem] How to deploy multiple schedulers on cluster but not docker

smallzhongfeng opened a new issue, #803:
URL: https://github.com/apache/arrow-ballista/issues/803

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "thinkharderdev (via GitHub)" <gi...@apache.org>.
thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1628653290

   > ```
   > local:
   > INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 588.435µs 
   > etcd:
   > INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 77.631762ms
   > ```
   > 
   > These days, I am testing the performance of etcd as a storage, but I found that the performance is very poor, especially when applying for and releasing resources. The time spent here is much longer than the local storage mode. Here I suspect this is the global distributed lock. Cause, do you have any suggestions here? @thinkharderdev @avantgardnerio
   
   Yes, we found the same with etcd. I would suggest that if you don't need HA then use a single scheduler with in-memory state. If you need HA, what we did is implement `ClusterState` backed by redis. Haven't found the time to upstream the implementation yet but it's relatively simple. I will try to find some time to do that soon but the gist is that you use an `hmap` for holding the free task slots (so a map `executor_id -> task_slots`) and then a lua script for atomic reservation/freeing. Something roughly like:
   
   ```rust
   const RESERVATION_SCRIPT: &str = r#"
   local desired_slots = tonumber(ARGV[1])
   local s = {}
   for i=2, #ARGV do
       local exists = redis.call('HEXISTS', KEYS[1], ARGV[i])
       if( exists == 1 ) then
           local value = redis.call('HGET', KEYS[1], ARGV[i])
           local slots = tonumber(value)
           if( slots >= desired_slots ) then
               s[i - 1] = desired_slots
               local inc = -desired_slots
               redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
               desired_slots = 0
           elseif slots == 0 then
               s[i - 1] = 0
           else
               s[i - 1] = slots
               local inc = -slots
               redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
               desired_slots = desired_slots - slots
           end
       else
           s[i - 1] = 0
       end
   
       if( desired_slots <= 0 ) then
           break
       end
   end
   return cjson.encode(s)
   "#;
   
   const CANCEL_SCRIPT: &str = r#"
   local cancelled = 0
   for i=2, #KEYS do
       local exists = redis.call('HEXISTS', KEYS[1], KEYS[i])
       if( exists == 1 ) then
           local inc = tonumber(ARGV[i - 1])
           redis.call('HINCRBY', KEYS[1], KEYS[i], inc)
           cancelled = cancelled + inc
       end
   end
   return cancelled
   "#;
   
   const SLOTS_KEY: &str = "task-slots";
   
   impl ClusterState for MyRedisState {
       async fn reserve_slots(
           &self,
           num_slots: u32,
           _distribution: TaskDistribution,
           executors: Option<HashSet<String>>,
       ) -> Result<Vec<ExecutorReservation>> {
           if num_slots == 0 {
               return Ok(vec![]);
           }
   
           if let Some(executors) = executors {
               let mut connection = self.get_connection().await?;
   
               let script = Script::new(RESERVATION_SCRIPT);
   
               let mut script = script.key(SLOTS_KEY);
               script.arg(num_slots);
   
               if !executors.is_empty() {
                   let executor_ids: Vec<String> = executors.into_iter().collect();
                   for executor_id in &executor_ids {
                       script.arg(executor_id);
                   }
   
                   let result: String = match script.invoke_async(&mut connection).await {
                       Ok(result) => result,
                       Err(e) => {
                           timer.stop_and_discard();
                           return Err(into_ballista_error(e));
                       }
                   };
   
                   let reservations = serde_json::from_str::<Vec<u32>>(&result).map_err(|e| {
                       BallistaError::General(format!(
                           "Error executing reservations, unexpected response from redis: {e:?}"
                       ))
                   })?;
   
                   let reservations: Vec<ExecutorReservation> = executor_ids
                       .into_iter()
                       .zip(reservations)
                       .flat_map(|(id, reserved)| {
                           (0..reserved).map(move |_| ExecutorReservation::new_free(id.clone()))
                       })
                       .collect();
   
                   return Ok(reservations);
               }
           }
   
           Ok(vec![])
       }
   
       async fn cancel_reservations(&self, reservations: Vec<ExecutorReservation>) -> Result<()> {
           let mut connection = self.get_connection().await?;
   
           if !reservations.is_empty() {
               let script = Script::new(CANCEL_SCRIPT);
               let mut script = script.key(SLOTS_KEY);
   
               let reservations = reservations
                   .into_iter()
                   .group_by(|r| r.executor_id.clone())
                   .into_iter()
                   .map(|(key, group)| (key, group.count()))
                   .collect::<HashMap<String, usize>>();
   
               for (executor_id, slots) in reservations {
                   script.key(executor_id);
                   script.arg(slots);
               }
   
               let cancelled: u64 = match script.invoke_async(&mut connection).await {
                   Ok(cancelled) => {
                       cancelled
                   }
                   Err(e) => {
                       return Err(into_ballista_error(e));
                   }
               };
   
               debug!("Cancelled {} reservations", cancelled);
   
               Ok(())
           } else {
               Ok(())
           }
       }
   }
   
   ```
   
   Not that this only supports `TaskDistribution::Bias` and for round-robin task distribution you would need a different lua script (which we have not implemented) but it could be done in principle. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "thinkharderdev (via GitHub)" <gi...@apache.org>.
thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1593393843

   > If we use the `push` policy, will the task fail when the scheduler switches? Or will the running tasks before the switch fail? @thinkharderdev
   
   Yes, currently the active job state is stored in memory on the scheduler and if the scheduler shuts down/restarts before the job completes then the job will fail. The scheduler will try and wait for all it's active jobs to complete once it receives a `SIGTERM` and before it terminates but depending on how long running the job is then that may not be feasible. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on cluster but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1576444941

   PTAL @thinkharderdev @andygrove @yahoNanJing 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1592886794

   Thanks for your patient reply. I got it. 👍 
   But I still have a question. If we use the `push` policy, will the task fail when the scheduler switches? Or will the running tasks before the switch fail?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1576707606

   Could we have some docs for this ?
   ![image](https://github.com/apache/arrow-ballista/assets/84573424/e00d2a21-64d2-4167-952e-45b0fce6d6a7)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1584357927

   So if I want to deploy multi-scheduler, should I change this ? Does this provide `HA` capabilities?
   ```
   metadata:
     name: ballista-scheduler
   spec:
     replicas: 2
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1579839812

   Could someone give me some advices plz?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "thinkharderdev (via GitHub)" <gi...@apache.org>.
thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1585613312

   > I just want to be able to deploy multiple schedulers to ensure high availability of the scheduler. @thinkharderdev
   
   So you have two options:
   
   1. Out of the box support for HA scheduler. As @avantgardnerio as long as you configure a storage backend that can be shared between the schedulers then this should work out of the box. There is already a storage backend implemented with etcd that you can use out of the box, but implementing a custom backend is relatively straightforward if you want to use some other DB or KV store. However the shared storage and distributed locking can add a significant amount of overhead. 
   2. If you need high throughput on task scheduling then you can implement an API layer in front of the scheduler that can route calls to the correct scheduler and then have schedulers use only in-memory state. The API layer would need to know which scheduler "owns" each query and route status requests to the correct scheduler. 
   
   Option 2 is what we have done in our deployment. We have multiple schedulers, each using an in-memory `JobState` and an API layer in front which routes calls to the appropriate scheduler. We also use a shared `ClusterState` based on redis (not yet upstreamed but it is relatively straightforward to implement). This gives all the schedulers a consistent view of the executor task slots and with a little bit of redis server-side scripting doesn't require any distributed locks. 
   
   One downside of this approach is that the job state is volatile so if a scheduler dies then all jobs running on it are lost. If you are running relatively short-duration queries then this is not a huge issue (at least for us) since the scheduler will try and complete any in-flight jobs before it shuts down so you can set up your deployment such that the schedulers have a shutdown grace period sufficient to complete any outstanding work.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] avantgardnerio commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "avantgardnerio (via GitHub)" <gi...@apache.org>.
avantgardnerio commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1580799603

   > Is there any `yaml`
   
   There is a [helm chart](https://github.com/apache/arrow-ballista/blob/main/helm/ballista/Chart.yaml), and an example [kubernetes.yaml](https://github.com/apache/arrow-ballista/blob/main/docs/source/user-guide/deployment/kubernetes.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1611396722

   ```
   local:
   INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 588.435µs 
   etcd:
   INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 77.631762ms
   ```
   These days, I am testing the performance of etcd as a storage, but I found that the performance is very poor, especially when applying for and releasing resources. The time spent here is much longer than the local storage mode. Here I suspect this is the global distributed lock. Cause, do you have any suggestions here? @thinkharderdev @avantgardnerio 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1580694067

   Is there any `yaml` for deploying `scheduler`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] avantgardnerio commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "avantgardnerio (via GitHub)" <gi...@apache.org>.
avantgardnerio commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1584592564

   I have not deployed Ballista in a production environment, much less HA so I will let others share their experience.
   
   My understanding is that as long as you set shared storage (etd, sled, etc) that is all that is required. (And a load balancer, as mentioned before - I think the helm chart did that)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] yahoNanJing commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1628072182

   Hi @smallzhongfeng, could you explain the reason of using multiple schedulers? Is it just for HA or worried about the performance of single scheduler for task scheduling? If for HA, is it acceptable that all jobs scheduled on a scheduler fail when a scheduler is down?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] thinkharderdev commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "thinkharderdev (via GitHub)" <gi...@apache.org>.
thinkharderdev commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1580285510

   Hi @smallzhongfeng. Do you want to deploy ballista with multiple schedulers outside of kubernetes? Standalone mode does not currently support multiple schedulers as all it does is spins up the scheduler server in a tokio task with a hardcoded bind address of `localhost:50050`. But if you just want to deploy a cluster without kubernetes or docker then it is possible (although I don't know of anyone who has done it that way). All you would need is some sort of load balancer to sit in front of your schedulers and everything else should work. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1596524956

   Thanks for your reply again!
   Last question. Can we extract the task scheduling part of the scheduler and make it a thread like the spark driver, so that when the scheduler hangs up, it will not affect the execution of the task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1584444004

   I just want to be able to deploy multiple schedulers to ensure high availability of the scheduler.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-ballista] smallzhongfeng commented on issue #803: [Problem] How to deploy multiple schedulers on standalone mode but not docker

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on issue #803:
URL: https://github.com/apache/arrow-ballista/issues/803#issuecomment-1633655522

   > could you explain the reason of using multiple schedulers? Is it just for HA or worried about the performance of single scheduler for task scheduling?
   
   Hi @yahoNanJing  At present, we intend to support ha
   
   > If for HA, is it acceptable that all jobs scheduled on a scheduler fail when a scheduler is down?
   
   We cannot accept the failure of all the jobs in the switching process, we are supporting the retry of the jobs in the switching process
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org