You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shay Elbaz <sh...@gm.com> on 2022/11/06 14:54:07 UTC

Re: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

I don't think there is a definitive right or wrong approach here. The SLS feature would not have been added to Spark if there was no real need for it, and AFAIK it required quite a bit of refactoring of Spark internals. So I'm sure this discussion was already made in the developers community  :)

In my specific case, I need it also for interactive dev/research sessions on Jupyter notebooks, and it makes more sense to switch resources than stopping the session and starting a new one (over and over again).

Shay
________________________________
From: ayan guha <gu...@gmail.com>
Sent: Sunday, November 6, 2022 4:19 PM
To: Shay Elbaz <sh...@gm.com>
Cc: Artemis User <ar...@dtechspace.com>; Tom Graves <tg...@yahoo.com>; Tom Graves <tg...@yahoo.com.invalid>; user@spark.apache.org <us...@spark.apache.org>
Subject: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs


ATTENTION: This email originated from outside of GM.


May I ask why the ETL job and DL ( Assuming you mean deep learning here) task can not be run as 2 separate spark job?

IMHO it is better practice to split up entire pipeline into logical steps and orchestrate them.

That way you can pick your profile as you need for 2 very different type of workloads.

Ayan

On Sun, 6 Nov 2022 at 12:04 am, Shay Elbaz <sh...@gm.com>> wrote:
Consider this:

  1.  The application is allowed to use only 20 GPUs.
  2.  To ensure exactly 20 GPUs, I use the df.rdd.repartition(20).withResources(gpus.build).mapPartitions(func) technique. (maxExecutors >> 20).
  3.  Given the volume of the input data, it takes 20 hours total to run the DL part (computer vision) on 20 GPUs, or 1 hour per GPU task.

Normally, I would repartition to 200 partitions to get a finer grained ~6 minutes tasks instead of 1 hour. But here we're "forced" to use only 20 partitions. To be clear, I'm only referring to potential failures/lags here. The job needs at least 20 hours total (on 20 GPUs) no matter what, but if any task fails after 50 minutes for example, we have to re-process these 50 minutes again. Or if a task/executor lags behind due to environment issues, then speculative execution will only trigger another task after 1 hour. These issues would be avoided if we used 200 partitions, but then Spark will try to allocate more than 20 GPUs.

I hope that was more clear.
Thank you very much for helping.

Shay

________________________________
From: Tom Graves <tg...@yahoo.com>>
Sent: Friday, November 4, 2022 4:19 PM
To: Tom Graves <tg...@yahoo.com.invalid>; Artemis User <ar...@dtechspace.com>>; user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>; Shay Elbaz <sh...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs


ATTENTION: This email originated from outside of GM.


So I'm not sure I completely follow. Are you asking for a way to change the limit without having to do the repartition?  And your DL software doesn't care if you got say 30 executors instead of 20?  Normally I would expect the number fo partitions at that point to be 200 (or whatever you set for your shuffle partitions) unless you are using AQE coalescing partitions functionality and then it could change. Are you using the latter?

> Normally I try to aim for anything between 30s-5m per task (failure-wise), depending on the cluster, its stability, etc. But in this case, individual tasks can take 30-60 minutes, if not much more. Any failure during this long time is pretty expensive.

Are you saying when you manually do the repartition your DL tasks take 30-60 minutes?  so again you want like AQE coalesce partitions to kick in to attempt to pick partition sizes for your?


Tom

On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz <sh...@gm.com>> wrote:


This is exactly what we ended up doing! The only drawback I saw with this approach is that the GPU tasks get pretty big (in terms of data and compute time), and task failures become expansive. That's why I reached out to the mailing list in the first place 🙂
Normally I try to aim for anything between 30s-5m per task (failure-wise), depending on the cluster, its stability, etc. But in this case, individual tasks can take 30-60 minutes, if not much more. Any failure during this long time is pretty expensive.


Shay
________________________________
From: Tom Graves <tg...@yahoo.com.INVALID>
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User <ar...@dtechspace.com>>; user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>; Shay Elbaz <sh...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs


ATTENTION: This email originated from outside of GM.


Stage level scheduling does not allow you to change configs right now. This is something we thought about as follow on but have never implemented.  How many tasks on the DL stage are you running?  The typical case is run some etl lots of tasks... do mapPartitions and then run your DL stuff, before that mapPartitions you could do a repartition if necessary to get to exactly the number of tasks you want (20).  That way even if maxExecutors=500 you will only ever need 20 or whatever you repartition to and spark isn't going to ask for more then that.

Tom

On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz <sh...@gm.com>> wrote:


Thanks again Artemis, I really appreciate it. I have watched the video but did not find an answer.

Please bear with me just one more iteration 🙂

Maybe I'll be more specific:
Suppose I start the application with maxExecutors=500, executors.cores=2, because that's the amount of resources needed for the ETL part. But for the DL part I only need 20 GPUs. SLS API only allows to set the resources per executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I configure the profile with 1 GPU per executor.
So, the question is how do I limit the stage resources to 20 GPUs total?

Thanks again,
Shay

________________________________
From: Artemis User <ar...@dtechspace.com>>
Sent: Thursday, November 3, 2022 5:23 PM

To: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Shay,  You may find this video helpful (with some API code samples that you are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc&t=171s.  The issue here isn't how to limit the number of executors but to request for the right GPU-enabled executors dynamically.  Those executors used in pre-GPU stages should be returned back to resource managers with dynamic resource allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU acceleration is kind of new in Spark (not straightforward like in TF).   I wish the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:
Thanks Artemis. We are not using Rapids, but rather using GPUs through the Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to turn on shuffle tracking for dynamic allocation, anyhow.
The question is how we can limit the number of executors when building a new ResourceProfile, directly (API) or indirectly (some advanced workaround).

Thanks,
Shay


________________________________
From: Artemis User <ar...@dtechspace.com>
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Are you using Rapids for GPU support in Spark?  Couple of options you may want to try:

  1.  In addition to dynamic allocation turned on, you may also need to turn on external shuffling service.
  2.  Sounds like you are using Kubernetes.  In that case, you may also need to turn on shuffle tracking.
  3.  The "stages" are controlled by the APIs.  The APIs for dynamic resource request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:
Hi,

Our typical applications need less executors for a GPU stage than for a CPU stage. We are using dynamic allocation with stage level scheduling, and Spark tries to maximize the number of executors also during the GPU stage, causing a bit of resources chaos in the cluster. This forces us to use a lower value for 'maxExecutors' in the first place, at the cost of the CPU stages performance. Or try to solve this in the Kubernets scheduler level, which is not straightforward and doesn't feel like the right way to go.

Is there a way to effectively use less executors in Stage Level Scheduling? The API does not seem to include such an option, but maybe there is some more advanced workaround?

Thanks,
Shay







--
Best Regards,
Ayan Guha