You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Hegner, Travis" <TH...@trilliumit.com> on 2016/12/02 20:27:31 UTC

SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Hello,


I've just created a JIRA to open up discussion of a new feature that I'd like to propose.


https://issues.apache.org/jira/browse/SPARK-18689


I'd love to get some feedback on the idea. I know that normally anything related to scheduling or queuing automatically throws up the "hard to implement" red flags, but the proposal contains a rather simple way to implement the concept, which delegates the scheduling logic to the actual kernel of each worker, rather than in any spark core code. I believe this to be more flexible and simpler to set up and maintain than dynamic allocation, and avoids the need for any preemption type of logic.


The proposal does not contain any code. I am not (yet) familiar enough with the core spark code to confidently create an implementation.


I appreciate your time and am looking forward to your feedback!


Thanks,


Travis

Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by "Hegner, Travis" <TH...@trilliumit.com>.
>The standalone thing was meant as a simple way to deploy Spark, and we gotta be careful with introducing a lot more features to it because then it becomes just a full fledged cluster manager and is duplicating the work of the other more mature ones.

Understood. There seems to be a very fine line between making "standalone" better, or more intuitive, versus just not using it. The only thing I'm after personally is the ability to run more than one application at once, on the same spark standalone cluster, sharing all available cores with a predetermined level of priority. To me, that is a more intuitive way for it to run. This patch accomplishes that in a little over 100 lines added to the code base. Wanting to leave it at that point without any further expansion to avoid duplicate work is understandable.

>Have you thought about contributing specific changes to these cluster managers to address the gaps you have seen?

Somewhat. I've tested each of those cluster managers and even contributed to Mesos for some docker functionality. The basic foundation in each of the designs is through static core allocation, which doesn't work well when you want to share workloads on a small cluster, while still giving those workloads full access to all resources when they are available. I haven't studied the YARN code base closely, so I can't comment on how difficult it would be to contribute similar changes there. I don't currently run a YARN cluster, I only intend to run spark jobs, and the standalone mode was better for those in my tests.

Studying the way spark standalone launches its executors led me to realize that there was a shortcut, and a very small amount of code, to accomplish my goal. Granted, my goal seems to be a completely different concept in the way cpu time is allocated, compared to how any of the cluster managers are doing it. But it is the way the kernel manages cpu time within a host. Everything that runs in linux is already running in a cgroup, and is allocated cpu time based on it's shares. Imagine if your workstation could only one run process at a time, all resources reserved for just that process whether it needed them or not.

I'm not trying to turn spark standalone into a cluster manager. I know I keep mentioning ways to expand cgroup functionality, but that is because I see a lot of value in allowing the kernel to manage the resources there. That is not my short term goal, however. My short term goal is allowing multiple simultaneous spark applications to run on the same cluster. Does anyone see value in just that short term goal?

Thanks,

Travis

________________________________
From: Reynold Xin <rx...@databricks.com>
Sent: Thursday, December 15, 2016 14:07
To: Hegner, Travis
Cc: Jörn Franke; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

In general this falls directly into the domain of external cluster managers (YARN, Mesos, Kub). The standalone thing was meant as a simple way to deploy Spark, and we gotta be careful with introducing a lot more features to it because then it becomes just a full fledged cluster manager and is duplicating the work of the other more mature ones.

Have you thought about contributing specific changes to these cluster managers to address the gaps you have seen?



On Thu, Dec 15, 2016 at 10:38 AM, Hegner, Travis <TH...@trilliumit.com>> wrote:

Thanks for the response Jörn,

This patch is intended only for spark standalone.

My understanding of the YARN cgroup support is that it only limits cpu, rather than allocates it based on the priority or shares system. This could be old documentation that I'm remembering, however. Another issue with YARN is that it has a lot more overhead than standalone mode, and always seemed a bit less responsive in general. Lastly, I remember struggling greatly with yet another resource abstraction layer (as if spark doesn't have enough already), it still statically allocated cores (albeit virtual ones), and it was much more cumbersome to find a proper balance of resources to request for an app.

My experience in trying to accomplish something like this in Mesos was always met with frustration because the system still statically allocated cores away to be reserved by individual apps. Trying to adjust the priority of individual applications was only possible by increasing the core count, further starving other apps of available cores. It was impossible to give a priority lower than the default to an app. The cpu.shares parameter was abstracted away as a multiple of the number of requested cores, which had a double down affect on the app: not only was it given more cores, it was also given a higher priority to run on them. Perhaps this has changed in more recent versions, but this was my experience when testing it.

I'm not familiar with a spark scheduler for kubernetes, unless you mean to launch a standalone cluster in containers with kubernetes? In that case, this patch would simply divvy up the resources allocated to the spark-worker container among each of it's executors, based on the shares that each executor is given. This is similar to how my current environment works, I'm just not using kubernetes as a container launcher. I found kubernetes was quite limiting in the way we wanted our network to be structured, and it also seemed quite difficult to get new functionality exposed in the form of their yaml API system.

My goal with this patch is to essentially eliminate the static allocation of cpu cores at all. Give each app time on the cpu equal to the number of shares it has as a percentage of the total pool.

Thanks,

Travis

________________________________
From: Jörn Franke <jo...@gmail.com>>
Sent: Thursday, December 15, 2016 12:48
To: Hegner, Travis
Cc: Apache Spark Dev

Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Hi,

What about yarn or mesos used in combination with Spark. The have also cgroups. Or a kubernetes etc deployment.

On 15 Dec 2016, at 17:37, Hegner, Travis <TH...@trilliumit.com>> wrote:


Hello Spark Devs,


I have finally completed a mostly working proof of concept. I do not want to create a pull request for this code, as I don't believe it's production worthy at the moment. My intent is to better communicate what I'd like to accomplish. Please review the following patch: https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.


What the code does:


Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares" which defaults to None. When cgroups mode is enabled, a single executor is created on each worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor launch) called "spark-worker" to house any executors that it launches. Each executor is put into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter is set to the value in "spark.executor.shares", if this is "None", it inherits the value from the parent cgroup.


Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks like the kernel interface is the same for the most part.


I was able to launch a spark shell which consumed all cores in the cluster, but sat idle. I was then able to launch an application (client deploy-mode) which was also allocated all cores in the cluster, and ran to completion unhindered. Each of the executors on each worker was properly placed into it's respective cgroup, which in turn had the correct cpu.shares value allocated.


What the code still needs:


* Documentation (assuming the community moves forward with some kind of implementation)

* Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method; what do you think?)

* Proper handling of drivers's resources when running `--deploy-mode cluster`

* Better web UI indication of cgroup mode or core sharing (currently just shows up as an over allocation of cores per worker)

* Better environment/os/platform detection and testing (I won't be surprised if there is something broken if trying to run this on a different OS)

* Security/permissions for cgroups if running worker as non-root (perhaps creating the parent cgroup with correct permissions before launching the worker is all that is necessary)

  - running the worker in a container currently requires --privileged mode (I haven't figured out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount point)

* More user defined options

  - cgroup root path (currently hard coded)

  - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup... s"$appId-driver" ? default same #shares as executor? default double shares?

  - parent cpu.shares (currently os default)

  - parent cgroup name (currently hard coded)


I tried to structure the initial concept to make it easy to add support for more cgroup features (cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups are an extremely powerful resource allocation and isolation tool, and this patch is only scratching the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding into these features will require more code maintenance, but not enough that we should shy away from it.


<opinion>

I personally believe that any multi-node resource allocation system should offload as much of the scheduling and resource allocation as possible to the underlying kernel within the node level. Each node's own kernel is the best equipped place to manage those resources. Only the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the high priority app is waiting on disk I/O, and instantly give it back to the high priority app when it needs it, with (near) real-time granularity


The multi-node system should set up a proper framework to give each node's kernel the information it needs to allocate the resources correctly. Naturally, the system should allow resource reservations, and even limits, for the purposes of meeting and testing for SLAs and worst case scenarios as well. Linux cgroups are capable of doing those things in a near real-time fashion.


With a proper convention of priorities/shares for applications within an organization, I believe that everyone can get better throughput out of their hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to solve currently.

</opinion>

Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's code base structure. Looking forward to any feedback.

Thanks,

Travis

________________________________
From: Hegner, Travis <TH...@trilliumit.com>>
Sent: Tuesday, December 6, 2016 10:49
To: Steve Loughran
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.



Steve,

I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis

________________________________
From: Steve Loughran <st...@hortonworks.com>>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve




Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by Reynold Xin <rx...@databricks.com>.
In general this falls directly into the domain of external cluster managers
(YARN, Mesos, Kub). The standalone thing was meant as a simple way to
deploy Spark, and we gotta be careful with introducing a lot more features
to it because then it becomes just a full fledged cluster manager and is
duplicating the work of the other more mature ones.

Have you thought about contributing specific changes to these cluster
managers to address the gaps you have seen?



On Thu, Dec 15, 2016 at 10:38 AM, Hegner, Travis <TH...@trilliumit.com>
wrote:

> Thanks for the response Jörn,
>
> This patch is intended only for spark standalone.
>
> My understanding of the YARN cgroup support is that it only limits cpu,
> rather than allocates it based on the priority or shares system. This could
> be old documentation that I'm remembering, however. Another issue with YARN
> is that it has a lot more overhead than standalone mode, and always seemed
> a bit less responsive in general. Lastly, I remember struggling greatly
> with yet another resource abstraction layer (as if spark doesn't have
> enough already), it still statically allocated cores (albeit virtual
> ones), and it was much more cumbersome to find a proper balance of
> resources to request for an app.
>
> My experience in trying to accomplish something like this in Mesos was
> always met with frustration because the system still statically allocated
> cores away to be reserved by individual apps. Trying to adjust the priority
> of individual applications was only possible by increasing the core count,
> further starving other apps of available cores. It was impossible to give a
> priority lower than the default to an app. The cpu.shares parameter was
> abstracted away as a multiple of the number of requested cores, which had a
> double down affect on the app: not only was it given more cores, it was
> also given a higher priority to run on them. Perhaps this has changed in
> more recent versions, but this was my experience when testing it.
>
> I'm not familiar with a spark scheduler for kubernetes, unless you mean to
> launch a standalone cluster in containers with kubernetes? In that case,
> this patch would simply divvy up the resources allocated to the
> spark-worker container among each of it's executors, based on the shares
> that each executor is given. This is similar to how my current environment
> works, I'm just not using kubernetes as a container launcher. I found
> kubernetes was quite limiting in the way we wanted our network to be
> structured, and it also seemed quite difficult to get new functionality
> exposed in the form of their yaml API system.
>
> My goal with this patch is to essentially eliminate the static allocation
> of cpu cores at all. Give each app time on the cpu equal to the number of
> shares it has as a percentage of the total pool.
>
> Thanks,
>
> Travis
>
> ------------------------------
> *From:* Jörn Franke <jo...@gmail.com>
> *Sent:* Thursday, December 15, 2016 12:48
> *To:* Hegner, Travis
> *Cc:* Apache Spark Dev
>
> *Subject:* Re: SPARK-18689: A proposal for priority based app scheduling
> utilizing linux cgroups.
>
> Hi,
>
> What about yarn or mesos used in combination with Spark. The have also
> cgroups. Or a kubernetes etc deployment.
>
> On 15 Dec 2016, at 17:37, Hegner, Travis <TH...@trilliumit.com> wrote:
>
> Hello Spark Devs,
>
>
> I have finally completed a mostly working proof of concept. I do not want
> to create a pull request for this code, as I don't believe it's production
> worthy at the moment. My intent is to better communicate what I'd like to
> accomplish. Please review the following patch: https://github.com/
> apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.
>
>
> What the code does:
>
>
> Currently, it exposes two options "spark.cgroups.enabled", which defaults
> to false, and "spark.executor.shares" which defaults to None. When cgroups
> mode is enabled, a single executor is created on each worker, with access
> to all cores. The worker will create a parent cpu cgroup (on first executor
> launch) called "spark-worker" to house any executors that it launches. Each
> executor is put into it's own cgroup named with the app id, under the
> parent cgroup. The cpu.shares parameter is set to the value in
> "spark.executor.shares", if this is "None", it inherits the value from the
> parent cgroup.
>
>
> Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I
> have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is
> going to change this code base, but it looks like the kernel interface is
> the same for the most part.
>
>
> I was able to launch a spark shell which consumed all cores in the
> cluster, but sat idle. I was then able to launch an application (client
> deploy-mode) which was also allocated all cores in the cluster, and ran to
> completion unhindered. Each of the executors on each worker was properly
> placed into it's respective cgroup, which in turn had the correct
> cpu.shares value allocated.
>
>
> What the code still needs:
>
>
> * Documentation (assuming the community moves forward with some kind of
> implementation)
>
> * Sometimes the cgroups get destroyed after app completion, sometimes they
> don't. (need to put `.destroy()` call in a `finally` block., or in the
> `maybeCleanupApplication()` method; what do you think?)
>
> * Proper handling of drivers's resources when running `--deploy-mode
> cluster`
>
> * Better web UI indication of cgroup mode or core sharing (currently just
> shows up as an over allocation of cores per worker)
>
> * Better environment/os/platform detection and testing (I won't be
> surprised if there is something broken if trying to run this on a different
> OS)
>
> * Security/permissions for cgroups if running worker as non-root (perhaps
> creating the parent cgroup with correct permissions before launching the
> worker is all that is necessary)
>
>   - running the worker in a container currently requires --privileged mode
> (I haven't figured out if/what capability makes cgroups writable, or if
> it's possible to use a new cgroup mount point)
>
> * More user defined options
>
>   - cgroup root path (currently hard coded)
>
>   - driver cpu.shares (for cluster deploy-mode: would require a specially
> named cgroup... s"$appId-driver" ? default same #shares as executor?
> default double shares?
>
>   - parent cpu.shares (currently os default)
>
>   - parent cgroup name (currently hard coded)
>
>
> I tried to structure the initial concept to make it easy to add support
> for more cgroup features (cpuset, mem, etc...), should the community feel
> there is value in adding them. Linux cgroups are an extremely powerful
> resource allocation and isolation tool, and this patch is only scratching
> the surface of their general capabilities. Of course, as Mr. Loughran's
> points out, expanding into these features will require more code
> maintenance, but not enough that we should shy away from it.
>
>
> <opinion>
>
> I personally believe that any multi-node resource allocation system should
> offload as much of the scheduling and resource allocation as possible to
> the underlying kernel within the node level. Each node's own kernel is the
> best equipped place to manage those resources. Only the node's kernel can
> allocate a few seconds worth of cpu to the low priority app, while the high
> priority app is waiting on disk I/O, and instantly give it back to the high
> priority app when it needs it, with (near) real-time granularity
>
>
> The multi-node system should set up a proper framework to give each node's
> kernel the information it needs to allocate the resources correctly. Naturally,
> the system should allow resource reservations, and even limits, for the
> purposes of meeting and testing for SLAs and worst case scenarios as
> well. Linux cgroups are capable of doing those things in a near real-time
> fashion.
>
>
> With a proper convention of priorities/shares for applications within an
> organization, I believe that everyone can get better throughput out of
> their hardware, at any cluster size. But, alas, *that* is not a problem I'm
> trying to solve currently.
>
> </opinion>
>
> Sorry that the patch is pretty rough still, as I'm still getting my head
> wrapped around spark's code base structure. Looking forward to any
> feedback.
>
> Thanks,
>
> Travis
>
> ------------------------------
> *From:* Hegner, Travis <TH...@trilliumit.com>
> *Sent:* Tuesday, December 6, 2016 10:49
> *To:* Steve Loughran
> *Cc:* Shuai Lin; Apache Spark Dev
> *Subject:* Re: SPARK-18689: A proposal for priority based app scheduling
> utilizing linux cgroups.
>
>
>
> Steve,
>
> I appreciate your experience and insight when dealing with large clusters
> at the data-center scale. I'm also well aware of the complex nature of
> schedulers, and that it is an area of ongoing research being done by
> people/companies with many more resources than I have. This might explain
> my apprehension in even calling this idea a *scheduler*: I wanted to avoid
> this exact kind of debate over what I want to accomplish. This is also why
> I mentioned that this idea will mostly benefit users with small clusters.
>
> I've used many of the big named "cluster schedulers" (YARN, Mesos,
> and Kubernetes) and the main thing that they have in common is that they
> don't work well for my use case. Those systems are designed for large scale
> 1000+ node clusters, and become painful to manage in the small cluster
> range. Most of the tools that we've attempted to use don't work well for
> us, so we've written several of our own: https://github.com/trilliumit/.
>
> It can be most easily stated by the fact that *we are not* Google,
> Facebook, or Amazon: we don't have a *data-center* of servers to manage, we
> barely have half of a rack. *We are not trying to solve the problem that
> you are referring to*. We are operating at a level that if we aren't
> meeting SLAs, then we could just buy another server to add to the cluster.
> I imagine that we are not alone in that fact either, I've seen that many of
> the questions on SO and on the user list are from others operating at a
> level similar to ours.
>
> I understand that pre-emption isn't inherently a bad thing, and that these
> multi-node systems typically handle it gracefully. However, if idle CPU is
> expensive, then how much more does wasted CPU cost when a nearly complete
> task is pre-empted and has to be started over? Fortunately for me, that
> isn't a problem that I have to solve at the moment.
>
> >Instead? Use a multi-user cluster scheduler and spin up different spark
> instances for the different workloads
>
> See my above comment on how well these cluster schedulers work for us. I
> have considered the avenue of multiple spark clusters, and in reality the
> infrastructure we have set up would allow me to do this relatively easily.
> In fact, in my environment, this is a similar solution to what I'm
> proposing, just managed one layer up the stack and with less flexibility. I
> am trying to avoid this solution however because it does require more
> overhead and maintenance. What if I want two spark apps to run on the same
> cluster at the same time, sharing the available CPU capacity equally? I
> can't accomplish that easily with multiple spark clusters. Also, we are a 1
> to 2 man operation at this point, I don't have teams of ops people to task
> with managing as many spark clusters as I feel like launching.
>
> >FWIW, it's often memory consumption that's most problematic here.
>
> Perhaps in the use-cases you have experience with, but not currently in
> mine. In fact, my initial proposal is net yet changing the allocation of
> memory as a resource. This would still be statically allocated in a FIFO
> manner as long as memory is available on the cluster, the same way it is
> now.
>
> >I would strongly encourage you to avoid this topic
>
> Thanks for the suggestion, but I will choose how I spend my time. If I can
> find a simple solution to a problem that I face, and I'm willing to share
> that solution, I'd hope one would encourage that instead.
>
>
> Perhaps I haven't yet clearly communicated what I'm trying to do. In
> short, *I am not trying to write a scheduler*: I am trying to slightly (and
> optionally) tweak the way executors are allocated and launched, so that I
> can more intuitively and more optimally utilize my small spark cluster.
>
> Thanks,
>
> Travis
>
> ------------------------------
> *From:* Steve Loughran <st...@hortonworks.com>
> *Sent:* Tuesday, December 6, 2016 06:54
> *To:* Hegner, Travis
> *Cc:* Shuai Lin; Apache Spark Dev
> *Subject:* Re: SPARK-18689: A proposal for priority based app scheduling
> utilizing linux cgroups.
>
> This is essentially what the cluster schedulers do: allow different people
> to submit work with different credentials and priority; cgroups &
> equivalent to limit granted resources to requested ones. If you have
> pre-emption enabled, you can even have one job kill work off the others.
> Spark does recognise pre-emption failures and doesn't treat it as a sign of
> problems in the executor, that is: it doesn't over-react.
>
> cluster scheduling is one of the cutting edge bits of datacentre-scale
> computing —if you are curious about what is state of the art, look at the
> Morning Paper https://blog.acolyer.org/ for coverage last week of MS and
> google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's
> not just meeting SLAs, its about how much idle CPU costs, and how expensive
> even a 1-2% drop in throughput would be.
>
>
> I would strongly encourage you to avoid this topic, unless you want dive
> deep into the whole world of cluster scheduling, the debate over
> centralized vs decentralized, the idelogical one of "should services ever
> get allocated RAM/CPU in times of low overall load?", the challenge of
> swap, or more specifically, "how do you throttle memory consumption", as
> well as what to do when the IO load of a service is actually incurred on a
> completely different host from the one your work is running on.
>
> There's also a fair amount of engineering work; to get a hint download the
> Hadoop tree and look at hadoop-yarn-project/hadoop-
> yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/
> apache/hadoop/yarn/server/nodemanager/containermanager/linux for the
> cgroup support, and then hadoop-yarn-project/hadoop-
> yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
> for the native code needed alongside this. Then consider that it's not just
> a matter of writing something similar, it's getting an OSS project to
> actually commit to maintaining such code after you provide that initial
> contribution.
>
> Instead? Use a multi-user cluster scheduler and spin up different spark
> instances for the different workloads, with different CPU & memory limits,
> queue priorities, etc. Other people have done the work, written the tests,
> deployed it in production, met their own SLAs *and are therefore committed
> to maintaining this stuff*.
>
> -Steve
>
> On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com> wrote:
>
> My apologies, in my excitement of finding a rather simple way to
> accomplish the scheduling goal I have in mind, I hastily jumped straight
> into a technical solution, without explaining that goal, or the problem
> it's attempting to solve.
>
> You are correct that I'm looking for an additional running mode for the
> standalone scheduler. Perhaps you could/should classify it as a different
> scheduler, but I don't want to give the impression that this will be
> as difficult to implement as most schedulers are. Initially, from a memory
> perspective, we would still allocate in a FIFO manner. This new scheduling
> mode (or new scheduler, if you'd rather) would mostly benefit any users
> with small-ish clusters, both on-premise and cloud based. Essentially, my
> end goal is to be able to run multiple *applications* simultaneously with
> each application having *access* to the entire core count of the cluster.
>
> I have a very cpu intensive application that I'd like to run weekly. I
> have a second application that I'd like to run hourly. The hourly
> application is more time critical (higher priority), so I'd like it to
> finish in a small amount of time. If I allow the first app to run with all
> cores (this takes several days on my 64 core cluster), then nothing else
> can be executed when running with the default FIFO scheduler. All of the
> cores have been allocated to the first application, and it will not release
> them until it is finished. Dynamic allocation does not help in this case,
> as there is always a backlog of tasks to run until the first application is
> nearing the end anyway. Naturally, I could just limit the number of cores
> that the first application has access to, but then I have idle cpu time
> when the second app is not running, and that is not optimal. Secondly in
> that case, the second application only has access to the *leftover* cores
> that the first app has not allocated, and will take a considerably longer
> amount of time to run.
>
> You could also imagine a scenario where a developer has a spark-shell
> running without specifying the number of cores they want to utilize
> (whether intentionally or not). As I'm sure you know, the default is to
> allocate the entire cluster to this application. The cores allocated to
> this shell are unavailable to other applications, even if they are just
> sitting idle while a developer is getting their environment set up to run a
> very big job interactively. Other developers that would like to launch
> interactive shells are stuck waiting for the first one to exit their shell.
>
> My proposal would eliminate this static nature of core counts and allow as
> many simultaneous applications to be running as the cluster memory (still
> statically partitioned, at least initially) will allow. Applications could
> be configured with a "cpu shares" parameter (just an arbitrary integer
> relative only to other applications) which is essentially just passed
> through to the linux cgroup cpu.shares setting. Since each executor of an
> application on a given worker runs in it's own process/jvm, then that
> process could be easily be placed into a cgroup created and dedicated for
> that application.
>
> Linux cgroups cpu.shares are pretty well documented, but the gist is that
> processes competing for cpu time are allocated a percentage of time equal
> to their share count as a percentage of all shares in that level of the
> cgroup hierarchy. If two applications are both scheduled on the same core
> with the same weight, each will get to utilize 50% of the time on that
> core. This is all built into the kernel, and the only thing the spark
> worker has to do is create a cgroup for each application, set the
> cpu.shares parameter, and assign the executors for that application to the
> new cgroup. If multiple executors are running on a single worker, for a
> single application, the cpu time available to that application is divided
> among each of those executors equally. The default for cpu.shares is that
> they are not limiting in any way. A process can consume all available cpu
> time if it would otherwise be idle anyway.
>
>
>
> That's the issue that surfaces in google papers: should jobs get idle
> capacity. Current consensus is "no". Why not? Because you may end up
> writing an SLA-sensitive app which just happens to meet it's SLAs in times
> of light cluster load, but precisely when the cluster is busy, it suddenly
> slows down, leading to stress all round, in the "why is this service
> suddenly unusable" kind of stress. Instead you keep the cluster busy with
> low priority preemptible work, use labels to allocate specific hosts to
> high-SLA apps, etc.
>
>
> Another benefit to passing cpu.shares directly to the kernel (as opposed
> to some abstraction) is that cpu share allocations are heterogeneous to all
> processes running on a machine. An admin could have very fine grained
> control over which processes get priority access to cpu time, depending on
> their needs.
>
>
>
> To continue my personal example above, my long running cpu intensive
> application could utilize 100% of all cluster cores if they are idle.
> Then my time sensitive app could be launched with nine times the priority
> and the linux kernel would scale back the first application to 10% of all
> cores (completely seemlessly and automatically: no pre-emption, just fewer
> time slices of cpu allocated by the kernel to the first application), while
> the second application gets 90% of all the cores until it completes.
>
>
> FWIW, it's often memory consumption that's most problematic here. If one
> process starts to swap, it hurts everything else. But Java isn't that good
> at handling limited heap/memory size; you have to spec that heap up front.
>
>
> The only downside that I can think of currently is that this
> scheduling mode would create an increase in context switching on each host.
> This issue is somewhat mitigated by still statically allocating memory
> however, since there wouldn't typically be an exorbitant number of
> applications running at once.
>
> In my opinion, this would allow the most optimal usage of cluster
> resources. Linux cgroups allow you to control access to more than just cpu
> shares. You can apply the same concept to other resources (memory, disk
> io). You can also set up hard limits so that an application will never get
> more than is allocated to it. I know that those limitations are important
> for some use cases involving predictability of application execution
> times. Eventually, this idea could be expanded to include many more of the
> features that cgroups provide.
>
> Thanks again for any feedback on this idea. I hope that I have explained
> it a bit better now. Does anyone else can see value in it?
>
>
>
> I'm not saying "don't get involved in the scheduling problem"; I'm trying
> to show just how complex it gets in a large system. Before you begin to
> write a line of code, I'd recommend
>
> -you read as much of the published work as you can, including the google
> and microsoft papers, Facebook's FairScheduler work, etc, etc.
> -have a look at the actual code inside those schedulers whose source is
> public, that's YARN and Mesos.
> -try using these schedulers for your own workloads.
>
> really: scheduling work across a datacentre a complex problem that is
> still considered a place for cutting-edge research. Avoid unless you want
> to do that.
>
> -Steve
>
>
>

Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by "Hegner, Travis" <TH...@trilliumit.com>.
Thanks for the response Jörn,

This patch is intended only for spark standalone.

My understanding of the YARN cgroup support is that it only limits cpu, rather than allocates it based on the priority or shares system. This could be old documentation that I'm remembering, however. Another issue with YARN is that it has a lot more overhead than standalone mode, and always seemed a bit less responsive in general. Lastly, I remember struggling greatly with yet another resource abstraction layer (as if spark doesn't have enough already), it still statically allocated cores (albeit virtual ones), and it was much more cumbersome to find a proper balance of resources to request for an app.

My experience in trying to accomplish something like this in Mesos was always met with frustration because the system still statically allocated cores away to be reserved by individual apps. Trying to adjust the priority of individual applications was only possible by increasing the core count, further starving other apps of available cores. It was impossible to give a priority lower than the default to an app. The cpu.shares parameter was abstracted away as a multiple of the number of requested cores, which had a double down affect on the app: not only was it given more cores, it was also given a higher priority to run on them. Perhaps this has changed in more recent versions, but this was my experience when testing it.

I'm not familiar with a spark scheduler for kubernetes, unless you mean to launch a standalone cluster in containers with kubernetes? In that case, this patch would simply divvy up the resources allocated to the spark-worker container among each of it's executors, based on the shares that each executor is given. This is similar to how my current environment works, I'm just not using kubernetes as a container launcher. I found kubernetes was quite limiting in the way we wanted our network to be structured, and it also seemed quite difficult to get new functionality exposed in the form of their yaml API system.

My goal with this patch is to essentially eliminate the static allocation of cpu cores at all. Give each app time on the cpu equal to the number of shares it has as a percentage of the total pool.

Thanks,

Travis

________________________________
From: Jörn Franke <jo...@gmail.com>
Sent: Thursday, December 15, 2016 12:48
To: Hegner, Travis
Cc: Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Hi,

What about yarn or mesos used in combination with Spark. The have also cgroups. Or a kubernetes etc deployment.

On 15 Dec 2016, at 17:37, Hegner, Travis <TH...@trilliumit.com>> wrote:


Hello Spark Devs,


I have finally completed a mostly working proof of concept. I do not want to create a pull request for this code, as I don't believe it's production worthy at the moment. My intent is to better communicate what I'd like to accomplish. Please review the following patch: https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.


What the code does:


Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares" which defaults to None. When cgroups mode is enabled, a single executor is created on each worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor launch) called "spark-worker" to house any executors that it launches. Each executor is put into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter is set to the value in "spark.executor.shares", if this is "None", it inherits the value from the parent cgroup.


Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks like the kernel interface is the same for the most part.


I was able to launch a spark shell which consumed all cores in the cluster, but sat idle. I was then able to launch an application (client deploy-mode) which was also allocated all cores in the cluster, and ran to completion unhindered. Each of the executors on each worker was properly placed into it's respective cgroup, which in turn had the correct cpu.shares value allocated.


What the code still needs:


* Documentation (assuming the community moves forward with some kind of implementation)

* Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method; what do you think?)

* Proper handling of drivers's resources when running `--deploy-mode cluster`

* Better web UI indication of cgroup mode or core sharing (currently just shows up as an over allocation of cores per worker)

* Better environment/os/platform detection and testing (I won't be surprised if there is something broken if trying to run this on a different OS)

* Security/permissions for cgroups if running worker as non-root (perhaps creating the parent cgroup with correct permissions before launching the worker is all that is necessary)

  - running the worker in a container currently requires --privileged mode (I haven't figured out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount point)

* More user defined options

  - cgroup root path (currently hard coded)

  - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup... s"$appId-driver" ? default same #shares as executor? default double shares?

  - parent cpu.shares (currently os default)

  - parent cgroup name (currently hard coded)


I tried to structure the initial concept to make it easy to add support for more cgroup features (cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups are an extremely powerful resource allocation and isolation tool, and this patch is only scratching the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding into these features will require more code maintenance, but not enough that we should shy away from it.


<opinion>

I personally believe that any multi-node resource allocation system should offload as much of the scheduling and resource allocation as possible to the underlying kernel within the node level. Each node's own kernel is the best equipped place to manage those resources. Only the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the high priority app is waiting on disk I/O, and instantly give it back to the high priority app when it needs it, with (near) real-time granularity


The multi-node system should set up a proper framework to give each node's kernel the information it needs to allocate the resources correctly. Naturally, the system should allow resource reservations, and even limits, for the purposes of meeting and testing for SLAs and worst case scenarios as well. Linux cgroups are capable of doing those things in a near real-time fashion.


With a proper convention of priorities/shares for applications within an organization, I believe that everyone can get better throughput out of their hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to solve currently.

</opinion>

Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's code base structure. Looking forward to any feedback.

Thanks,

Travis

________________________________
From: Hegner, Travis <TH...@trilliumit.com>>
Sent: Tuesday, December 6, 2016 10:49
To: Steve Loughran
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.



Steve,

I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis

________________________________
From: Steve Loughran <st...@hortonworks.com>>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by Jörn Franke <jo...@gmail.com>.
Hi,

What about yarn or mesos used in combination with Spark. The have also cgroups. Or a kubernetes etc deployment.

> On 15 Dec 2016, at 17:37, Hegner, Travis <TH...@trilliumit.com> wrote:
> 
> Hello Spark Devs,
> 
> 
> I have finally completed a mostly working proof of concept. I do not want to create a pull request for this code, as I don't believe it's production worthy at the moment. My intent is to better communicate what I'd like to accomplish. Please review the following patch: https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.
> 
> 
> What the code does:
> 
> Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares" which defaults to None. When cgroups mode is enabled, a single executor is created on each worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor launch) called "spark-worker" to house any executors that it launches. Each executor is put into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter is set to the value in "spark.executor.shares", if this is "None", it inherits the value from the parent cgroup.
> 
> 
> Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks like the kernel interface is the same for the most part.
> 
> 
> I was able to launch a spark shell which consumed all cores in the cluster, but sat idle. I was then able to launch an application (client deploy-mode) which was also allocated all cores in the cluster, and ran to completion unhindered. Each of the executors on each worker was properly placed into it's respective cgroup, which in turn had the correct cpu.shares value allocated.
> 
> 
> What the code still needs:
> 
> 
> * Documentation (assuming the community moves forward with some kind of implementation)
> 
> * Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method; what do you think?)
> 
> * Proper handling of drivers's resources when running `--deploy-mode cluster`
> * Better web UI indication of cgroup mode or core sharing (currently just shows up as an over allocation of cores per worker)
> 
> * Better environment/os/platform detection and testing (I won't be surprised if there is something broken if trying to run this on a different OS)
> 
> * Security/permissions for cgroups if running worker as non-root (perhaps creating the parent cgroup with correct permissions before launching the worker is all that is necessary)
> 
>   - running the worker in a container currently requires --privileged mode (I haven't figured out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount point)
> 
> * More user defined options
> 
>   - cgroup root path (currently hard coded)
> 
>   - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup... s"$appId-driver" ? default same #shares as executor? default double shares?
> 
>   - parent cpu.shares (currently os default)
> 
>   - parent cgroup name (currently hard coded)
> 
> 
> 
> I tried to structure the initial concept to make it easy to add support for more cgroup features (cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups are an extremely powerful resource allocation and isolation tool, and this patch is only scratching the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding into these features will require more code maintenance, but not enough that we should shy away from it.
> 
> 
> 
> <opinion>
> 
> I personally believe that any multi-node resource allocation system should offload as much of the scheduling and resource allocation as possible to the underlying kernel within the node level. Each node's own kernel is the best equipped place to manage those resources. Only the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the high priority app is waiting on disk I/O, and instantly give it back to the high priority app when it needs it, with (near) real-time granularity
> 
> 
> 
> The multi-node system should set up a proper framework to give each node's kernel the information it needs to allocate the resources correctly. Naturally, the system should allow resource reservations, and even limits, for the purposes of meeting and testing for SLAs and worst case scenarios as well. Linux cgroups are capable of doing those things in a near real-time fashion.
> 
> 
> 
> With a proper convention of priorities/shares for applications within an organization, I believe that everyone can get better throughput out of their hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to solve currently.
> 
> </opinion>
> 
> 
> Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's code base structure. Looking forward to any feedback.
> 
> Thanks,
> 
> Travis
> 
> From: Hegner, Travis <TH...@trilliumit.com>
> Sent: Tuesday, December 6, 2016 10:49
> To: Steve Loughran
> Cc: Shuai Lin; Apache Spark Dev
> Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.
>  
> 
> Steve,
> 
> I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters.
> 
> I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/.
> 
> It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level  similar to ours.
> 
> I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment.
> 
> >Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads
> 
> See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching.
> 
> >FWIW, it's often memory consumption that's most problematic here.
> 
> Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now.
> 
> >I would strongly encourage you to avoid this topic
> 
> Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead.
> 
> 
> Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster.
> 
> Thanks,
> 
> Travis
> 
> From: Steve Loughran <st...@hortonworks.com>
> Sent: Tuesday, December 6, 2016 06:54
> To: Hegner, Travis
> Cc: Shuai Lin; Apache Spark Dev
> Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.
>  
> This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.
> 
> cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.
> 
> 
> I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on. 
> 
> There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.
> 
> Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.
> 
> -Steve
> 
>> On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com> wrote:
>> 
>> My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.
>> 
>> You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.
>> 
>> I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.
>> 
>> You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.
>> 
>> My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.
>> 
>> Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided  among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.
> 
> 
> That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.
> 
>> 
>> Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.
>> 
> 
> 
>> To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.
>> 
> 
> FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front. 
> 
> 
>> The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.
>> 
>> In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.
>> 
>> Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?
>> 
> 
> 
> I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend
> 
> -you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
> -have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
> -try using these schedulers for your own workloads.
> 
> really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.
> 
> -Steve
> 
> 

Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by "Hegner, Travis" <TH...@trilliumit.com>.
Hello Spark Devs,


I have finally completed a mostly working proof of concept. I do not want to create a pull request for this code, as I don't believe it's production worthy at the moment. My intent is to better communicate what I'd like to accomplish. Please review the following patch: https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.


What the code does:


Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares" which defaults to None. When cgroups mode is enabled, a single executor is created on each worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor launch) called "spark-worker" to house any executors that it launches. Each executor is put into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter is set to the value in "spark.executor.shares", if this is "None", it inherits the value from the parent cgroup.


Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks like the kernel interface is the same for the most part.


I was able to launch a spark shell which consumed all cores in the cluster, but sat idle. I was then able to launch an application (client deploy-mode) which was also allocated all cores in the cluster, and ran to completion unhindered. Each of the executors on each worker was properly placed into it's respective cgroup, which in turn had the correct cpu.shares value allocated.


What the code still needs:


* Documentation (assuming the community moves forward with some kind of implementation)

* Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method; what do you think?)

* Proper handling of drivers's resources when running `--deploy-mode cluster`

* Better web UI indication of cgroup mode or core sharing (currently just shows up as an over allocation of cores per worker)

* Better environment/os/platform detection and testing (I won't be surprised if there is something broken if trying to run this on a different OS)

* Security/permissions for cgroups if running worker as non-root (perhaps creating the parent cgroup with correct permissions before launching the worker is all that is necessary)

  - running the worker in a container currently requires --privileged mode (I haven't figured out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount point)

* More user defined options

  - cgroup root path (currently hard coded)

  - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup... s"$appId-driver" ? default same #shares as executor? default double shares?

  - parent cpu.shares (currently os default)

  - parent cgroup name (currently hard coded)


I tried to structure the initial concept to make it easy to add support for more cgroup features (cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups are an extremely powerful resource allocation and isolation tool, and this patch is only scratching the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding into these features will require more code maintenance, but not enough that we should shy away from it.


<opinion>

I personally believe that any multi-node resource allocation system should offload as much of the scheduling and resource allocation as possible to the underlying kernel within the node level. Each node's own kernel is the best equipped place to manage those resources. Only the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the high priority app is waiting on disk I/O, and instantly give it back to the high priority app when it needs it, with (near) real-time granularity


The multi-node system should set up a proper framework to give each node's kernel the information it needs to allocate the resources correctly. Naturally, the system should allow resource reservations, and even limits, for the purposes of meeting and testing for SLAs and worst case scenarios as well. Linux cgroups are capable of doing those things in a near real-time fashion.


With a proper convention of priorities/shares for applications within an organization, I believe that everyone can get better throughput out of their hardware, at any cluster size. But, alas, *that* is not a problem I'm trying to solve currently.

</opinion>

Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's code base structure. Looking forward to any feedback.

Thanks,

Travis

________________________________
From: Hegner, Travis <TH...@trilliumit.com>
Sent: Tuesday, December 6, 2016 10:49
To: Steve Loughran
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.



Steve,

I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis

________________________________
From: Steve Loughran <st...@hortonworks.com>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by "Hegner, Travis" <TH...@trilliumit.com>.
Steve,

I appreciate your experience and insight when dealing with large clusters at the data-center scale. I'm also well aware of the complex nature of schedulers, and that it is an area of ongoing research being done by people/companies with many more resources than I have. This might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this exact kind of debate over what I want to accomplish. This is also why I mentioned that this idea will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the main thing that they have in common is that they don't work well for my use case. Those systems are designed for large scale 1000+ node clusters, and become painful to manage in the small cluster range. Most of the tools that we've attempted to use don't work well for us, so we've written several of our own: https://github.com/trilliumit/.

It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not trying to solve the problem that you are referring to*. We are operating at a level that if we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine that we are not alone in that fact either, I've seen that many of the questions on SO and on the user list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems typically handle it gracefully. However, if idle CPU is expensive, then how much more does wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately for me, that isn't a problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads

See my above comment on how well these cluster schedulers work for us. I have considered the avenue of multiple spark clusters, and in reality the infrastructure we have set up would allow me to do this relatively easily. In fact, in my environment, this is a similar solution to what I'm proposing, just managed one layer up the stack and with less flexibility. I am trying to avoid this solution however because it does require more overhead and maintenance. What if I want two spark apps to run on the same cluster at the same time, sharing the available CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing as many spark clusters as I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my initial proposal is net yet changing the allocation of memory as a resource. This would still be statically allocated in a FIFO manner as long as memory is available on the cluster, the same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage that instead.


Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are allocated and launched, so that I can more intuitively and more optimally utilize my small spark cluster.

Thanks,

Travis

________________________________
From: Steve Loughran <st...@hortonworks.com>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.


I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by Steve Loughran <st...@hortonworks.com>.
This is essentially what the cluster schedulers do: allow different people to submit work with different credentials and priority; cgroups & equivalent to limit granted resources to requested ones. If you have pre-emption enabled, you can even have one job kill work off the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you are curious about what is state of the art, look at the Morning Paper https://blog.acolyer.org/ for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use, at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive even a 1-2% drop in throughput would be.

I would strongly encourage you to avoid this topic, unless you want dive deep into the whole world of cluster scheduling, the debate over centralized vs decentralized, the idelogical one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge of swap, or more specifically, "how do you throttle memory consumption", as well as what to do when the IO load of a service is actually incurred on a completely different host from the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl for the native code needed alongside this. Then consider that it's not just a matter of writing something similar, it's getting an OSS project to actually commit to maintaining such code after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the different workloads, with different CPU & memory limits, queue priorities, etc. Other people have done the work, written the tests, deployed it in production, met their own SLAs *and are therefore committed to maintaining this stuff*.

-Steve

On 5 Dec 2016, at 15:36, Hegner, Travis <TH...@trilliumit.com>> wrote:

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable" kind of stress. Instead you keep the cluster busy with low priority preemptible work, use labels to allocate specific hosts to high-SLA apps, etc.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.



To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


FWIW, it's often memory consumption that's most problematic here. If one process starts to swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size; you have to spec that heap up front.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?



I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers, Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place for cutting-edge research. Avoid unless you want to do that.

-Steve



Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by Michal Šenkýř <mi...@gmail.com>.
Hello Travis,


I am just a short-time member of this list but I can definitely see the 
benefit of using built-in OS resource management facilities to 
dynamically manage cluster resources on the node level in this manner. 
At our company we often fight for resources on our development cluster 
as well as sometimes cancel running jobs in production to free up 
immediately needed resources. If I understand it correctly, this would 
solve a lot of our problems.


The only downside I see with this is that it is Linux-specific.


Michal


On 5.12.2016 16:36, Hegner, Travis wrote:
>
> My apologies, in my excitement of finding a rather simple way to 
> accomplish the scheduling goal I have in mind, I hastily jumped 
> straight into a technical solution, without explaining that goal, or 
> the problem it's attempting to solve.
>
>
> You are correct that I'm looking for an additional running mode for 
> the standalone scheduler. Perhaps you could/should classify it as a 
> different scheduler, but I don't want to give the impression that this 
> will be as difficult to implement as most schedulers are. Initially, 
> from a memory perspective, we would still allocate in a FIFO 
> manner. This new scheduling mode (or new scheduler, if you'd 
> rather) would mostly benefit any users with small-ish clusters, both 
> on-premise and cloud based. Essentially, my end goal is to be able to 
> run multiple *applications* simultaneously with each application 
> having *access* to the entire core count of the cluster.
>
>
> I have a very cpu intensive application that I'd like to run weekly. I 
> have a second application that I'd like to run hourly. The hourly 
> application is more time critical (higher priority), so I'd like it to 
> finish in a small amount of time. If I allow the first app to run with 
> all cores (this takes several days on my 64 core cluster), then 
> nothing else can be executed when running with the default FIFO 
> scheduler. All of the cores have been allocated to the 
> first application, and it will not release them until it is finished. 
> Dynamic allocation does not help in this case, as there is always a 
> backlog of tasks to run until the first application is nearing the end 
> anyway. Naturally, I could just limit the number of cores that the 
> first application has access to, but then I have idle cpu time when 
> the second app is not running, and that is not optimal. Secondly in 
> that case, the second application only has access to the *leftover* 
> cores that the first app has not allocated, and will take a 
> considerably longer amount of time to run.
>
>
> You could also imagine a scenario where a developer has a spark-shell 
> running without specifying the number of cores they want to utilize 
> (whether intentionally or not). As I'm sure you know, the default is 
> to allocate the entire cluster to this application. The cores 
> allocated to this shell are unavailable to other applications, even if 
> they are just sitting idle while a developer is getting their 
> environment set up to run a very big job interactively. Other 
> developers that would like to launch interactive shells are stuck 
> waiting for the first one to exit their shell.
>
>
> My proposal would eliminate this static nature of core counts and 
> allow as many simultaneous applications to be running as the cluster 
> memory (still statically partitioned, at least initially) will allow. 
> Applications could be configured with a "cpu shares" parameter (just 
> an arbitrary integer relative only to other applications) which is 
> essentially just passed through to the linux cgroup cpu.shares 
> setting. Since each executor of an application on a given worker runs 
> in it's own process/jvm, then that process could be easily be placed 
> into a cgroup created and dedicated for that application.
>
>
> Linux cgroups cpu.shares are pretty well documented, but the gist is 
> that processes competing for cpu time are allocated a percentage of 
> time equal to their share count as a percentage of all shares in that 
> level of the cgroup hierarchy. If two applications are both scheduled 
> on the same core with the same weight, each will get to utilize 50% of 
> the time on that core. This is all built into the kernel, and the only 
> thing the spark worker has to do is create a cgroup for each 
> application, set the cpu.shares parameter, and assign the executors 
> for that application to the new cgroup. If multiple executors are 
> running on a single worker, for a single application, the cpu time 
> available to that application is divided among each of those executors 
> equally. The default for cpu.shares is that they are not limiting in 
> any way. A process can consume all available cpu time if it would 
> otherwise be idle anyway.
>
>
> Another benefit to passing cpu.shares directly to the kernel (as 
> opposed to some abstraction) is that cpu share allocations are 
> heterogeneous to all processes running on a machine. An admin could 
> have very fine grained control over which processes get priority 
> access to cpu time, depending on their needs.
>
>
> To continue my personal example above, my long running cpu intensive 
> application could utilize 100% of all cluster cores if they are idle. 
> Then my time sensitive app could be launched with nine times the 
> priority and the linux kernel would scale back the first application 
> to 10% of all cores (completely seemlessly and automatically: no 
> pre-emption, just fewer time slices of cpu allocated by the kernel to 
> the first application), while the second application gets 90% of all 
> the cores until it completes.
>
>
> The only downside that I can think of currently is that this 
> scheduling mode would create an increase in context switching on each 
> host. This issue is somewhat mitigated by still statically allocating 
> memory however, since there wouldn't typically be an exorbitant number 
> of applications running at once.
>
>
> In my opinion, this would allow the most optimal usage of cluster 
> resources. Linux cgroups allow you to control access to more than just 
> cpu shares. You can apply the same concept to other resources (memory, 
> disk io). You can also set up hard limits so that an application will 
> never get more than is allocated to it. I know that those limitations 
> are important for some use cases involving predictability of 
> application execution times. Eventually, this idea could be expanded 
> to include many more of the features that cgroups provide.
>
>
> Thanks again for any feedback on this idea. I hope that I have 
> explained it a bit better now. Does anyone else can see value in it?
>
>
> Travis
>
>
> ------------------------------------------------------------------------
> *From:* Shuai Lin <li...@gmail.com>
> *Sent:* Saturday, December 3, 2016 06:52
> *To:* Hegner, Travis
> *Cc:* dev@spark.apache.org
> *Subject:* Re: SPARK-18689: A proposal for priority based app 
> scheduling utilizing linux cgroups.
> Sorry but I don't get the scope of the problem from your description. 
> Seems it's an improvement for spark standalone scheduler (i.e. not for 
> yarn or mesos)?
>
> On Sat, Dec 3, 2016 at 4:27 AM, Hegner, Travis <THegner@trilliumit.com 
> <ma...@trilliumit.com>> wrote:
>
>     Hello,
>
>
>     I've just created a JIRA to open up discussion of a new feature
>     that I'd like to propose.
>
>
>     https://issues.apache.org/jira/browse/SPARK-18689
>     <https://issues.apache.org/jira/browse/SPARK-18689>
>
>
>     I'd love to get some feedback on the idea. I know that normally
>     anything related to scheduling or queuing automatically throws up
>     the "hard to implement" red flags, but the proposal contains a
>     rather simple way to implement the concept, which delegates the
>     scheduling logic to the actual kernel of each worker, rather than
>     in any spark core code. I believe this to be more flexible and
>     simpler to set up and maintain than dynamic allocation, and avoids
>     the need for any preemption type of logic.
>
>
>     The proposal does not contain any code. I am not (yet) familiar
>     enough with the core spark code to confidently create an
>     implementation.
>
>
>     I appreciate your time and am looking forward to your feedback!
>
>
>     Thanks,
>
>
>     Travis
>
>


Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by "Hegner, Travis" <TH...@trilliumit.com>.
My apologies, in my excitement of finding a rather simple way to accomplish the scheduling goal I have in mind, I hastily jumped straight into a technical solution, without explaining that goal, or the problem it's attempting to solve.


You are correct that I'm looking for an additional running mode for the standalone scheduler. Perhaps you could/should classify it as a different scheduler, but I don't want to give the impression that this will be as difficult to implement as most schedulers are. Initially, from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode (or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters, both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications* simultaneously with each application having *access* to the entire core count of the cluster.


I have a very cpu intensive application that I'd like to run weekly. I have a second application that I'd like to run hourly. The hourly application is more time critical (higher priority), so I'd like it to finish in a small amount of time. If I allow the first app to run with all cores (this takes several days on my 64 core cluster), then nothing else can be executed when running with the default FIFO scheduler. All of the cores have been allocated to the first application, and it will not release them until it is finished. Dynamic allocation does not help in this case, as there is always a backlog of tasks to run until the first application is nearing the end anyway. Naturally, I could just limit the number of cores that the first application has access to, but then I have idle cpu time when the second app is not running, and that is not optimal. Secondly in that case, the second application only has access to the *leftover* cores that the first app has not allocated, and will take a considerably longer amount of time to run.


You could also imagine a scenario where a developer has a spark-shell running without specifying the number of cores they want to utilize (whether intentionally or not). As I'm sure you know, the default is to allocate the entire cluster to this application. The cores allocated to this shell are unavailable to other applications, even if they are just sitting idle while a developer is getting their environment set up to run a very big job interactively. Other developers that would like to launch interactive shells are stuck waiting for the first one to exit their shell.


My proposal would eliminate this static nature of core counts and allow as many simultaneous applications to be running as the cluster memory (still statically partitioned, at least initially) will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary integer relative only to other applications) which is essentially just passed through to the linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs in it's own process/jvm, then that process could be easily be placed into a cgroup created and dedicated for that application.


Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing for cpu time are allocated a percentage of time equal to their share count as a percentage of all shares in that level of the cgroup hierarchy. If two applications are both scheduled on the same core with the same weight, each will get to utilize 50% of the time on that core. This is all built into the kernel, and the only thing the spark worker has to do is create a cgroup for each application, set the cpu.shares parameter, and assign the executors for that application to the new cgroup. If multiple executors are running on a single worker, for a single application, the cpu time available to that application is divided among each of those executors equally. The default for cpu.shares is that they are not limiting in any way. A process can consume all available cpu time if it would otherwise be idle anyway.


Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction) is that cpu share allocations are heterogeneous to all processes running on a machine. An admin could have very fine grained control over which processes get priority access to cpu time, depending on their needs.


To continue my personal example above, my long running cpu intensive application could utilize 100% of all cluster cores if they are idle. Then my time sensitive app could be launched with nine times the priority and the linux kernel would scale back the first application to 10% of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices of cpu allocated by the kernel to the first application), while the second application gets 90% of all the cores until it completes.


The only downside that I can think of currently is that this scheduling mode would create an increase in context switching on each host. This issue is somewhat mitigated by still statically allocating memory however, since there wouldn't typically be an exorbitant number of applications running at once.


In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups allow you to control access to more than just cpu shares. You can apply the same concept to other resources (memory, disk io). You can also set up hard limits so that an application will never get more than is allocated to it. I know that those limitations are important for some use cases involving predictability of application execution times. Eventually, this idea could be expanded to include many more of the features that cgroups provide.


Thanks again for any feedback on this idea. I hope that I have explained it a bit better now. Does anyone else can see value in it?


Travis

________________________________
From: Shuai Lin <li...@gmail.com>
Sent: Saturday, December 3, 2016 06:52
To: Hegner, Travis
Cc: dev@spark.apache.org
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Sorry but I don't get the scope of the problem from your description. Seems it's an improvement for spark standalone scheduler (i.e. not for yarn or mesos)?

On Sat, Dec 3, 2016 at 4:27 AM, Hegner, Travis <TH...@trilliumit.com>> wrote:

Hello,


I've just created a JIRA to open up discussion of a new feature that I'd like to propose.


https://issues.apache.org/jira/browse/SPARK-18689


I'd love to get some feedback on the idea. I know that normally anything related to scheduling or queuing automatically throws up the "hard to implement" red flags, but the proposal contains a rather simple way to implement the concept, which delegates the scheduling logic to the actual kernel of each worker, rather than in any spark core code. I believe this to be more flexible and simpler to set up and maintain than dynamic allocation, and avoids the need for any preemption type of logic.


The proposal does not contain any code. I am not (yet) familiar enough with the core spark code to confidently create an implementation.


I appreciate your time and am looking forward to your feedback!


Thanks,


Travis


Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

Posted by Shuai Lin <li...@gmail.com>.
Sorry but I don't get the scope of the problem from your description. Seems
it's an improvement for spark standalone scheduler (i.e. not for yarn or
mesos)?

On Sat, Dec 3, 2016 at 4:27 AM, Hegner, Travis <TH...@trilliumit.com>
wrote:

> Hello,
>
>
> I've just created a JIRA to open up discussion of a new feature that I'd
> like to propose.
>
>
> https://issues.apache.org/jira/browse/SPARK-18689
>
>
> I'd love to get some feedback on the idea. I know that normally anything
> related to scheduling or queuing automatically throws up the "hard to
> implement" red flags, but the proposal contains a rather simple way to
> implement the concept, which delegates the scheduling logic to the
> actual kernel of each worker, rather than in any spark core code. I believe
> this to be more flexible and simpler to set up and maintain than dynamic
> allocation, and avoids the need for any preemption type of logic.
>
>
> The proposal does not contain any code. I am not (yet) familiar enough
> with the core spark code to confidently create an implementation.
>
>
> I appreciate your time and am looking forward to your feedback!
>
>
> Thanks,
>
>
> Travis
>