You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by MaGuoWei <ma...@outlook.com> on 2015/07/27 03:51:24 UTC

add some new api to the scheduler in the job manager

hi guysFlink must take over all the resources all the time. That wastes resources sometime especially  in the sharing cluster.For example when using Flink on Yarn the resource can't be returned back to RM even if  no graph is running.So I want to know: Is it possible to add some apis in the scheduler to request the resource(slot) and release the resource(slot)?
These apis can be customized in the different circumstance.
thanks a lot!


 		 	   		  

Re: add some new api to the scheduler in the job manager

Posted by Robert Metzger <rm...@apache.org>.
Hi Ankur,

I am not aware of any up-to-date papers about the internals of Flink, but
the links on this wiki page:
https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals contain a
lot of helpful material.

I'm very happy to see that you are interested in contributing and
maintaining the flink-mesos integration :)

Robert

On Thu, Aug 20, 2015 at 8:10 AM, ankurcha <an...@malloc64.com> wrote:

> Hi Robert,
>
> I agree with everything that you and Stephan are saying. I haven't looked
> into the flink codebase and the papers/design docs to comment at a finer
> level so maybe that's the first piece of homework that I need to do ( need
> pointers for that).
>
> And, yes I would definitely be interested in owning/maintaining/extending
> the
> flink-mesos integration.
>
> -- Ankur Chauhan
>
> > On 19 Aug 2015, at 13:05, Robert Metzger [via Apache Flink Mailing List
> archive.] <ml...@n3.nabble.com> wrote:
> >
> > Hi,
> >
> > I'm sorry for the late reply, I'm still working through a long email
> > backlog from a one week vacation ;)
> > Thank you for the long reply in this thread. Your observations are very
> > good.
> > I think we should indeed work towards a more fine grained "intra-job"
> > elasticity. Let me comment on some of your statements below ...
> >
> >   * The taskManagers that are being killed off may have resources that
> are
> > > needed but other tasks so they can't always be killed off
> > > (files/intermediate results etc). This means that there needs to be
> some
> > > sort of "are you idle?" handshake that needs to be done.
> >
> >
> > I think here we have to distinguish between streaming and batch API jobs
> > here.
> > - For deployed streaming jobs, its usually impossible to take away
> > TaskManagers, because we are working on infinite streams (tasks are never
> > done). The simplest thing we can do is stopping machines where no tasks
> are
> > deployed to.
> > As Stephan mentioned, dynamic scaling of streaming jobs is certainly
> > something interesting for the future. There, we would need a component
> > which is implementing some sort of scaling policy (for example based on
> > throughput, load or latency). For up or down scaling, we would then
> > redeploy a job. For this feature, we certainly need nicely abstracted
> APIs
> > for YARN and Mesos to alter the running cluster.
> > - For batch jobs which are usually executed in a pipelined = streaming
> > fashion, we would need to execute them in a batch-fashion. (Otherwise,
> > tasks do not finish one after another)) Flink's runtime has already
> support
> > for that. With some additional logic, allowing us to recognize when an
> > intermediate dataset has been fully consumed by downstream tasks, we can
> > safely deallocate machines in a Flink cluster. I think such a logic can
> be
> > implemented in the JobManager's scheduler.
> >
> > * Ideally, we would want to isolate the logic (a general scheduler) that
> > > says "get me a slot meeting constraints X" into one module which
> utilises
> > > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > > needs of the former. This idea is sort of inspired from the way this
> > > separation exists in apache spark and seems to work out well.
> >
> >
> > The JobManager of Flink has a component which is scheduling a job graph
> in
> > the cluster. I think right now the system assumes that a certain number
> of
> > machines and processing slots are available.
> > But I it should not be too difficult to have something like "fake"
> machines
> > and slots there which are allocated on demand as needed (so that you
> > basically give the system an upper limit of resources to allocate)
> >
> > I agree with Stephan, that a first good step for fine-grained elasticity
> > would be a common interface for both YARN and Mesos.
> > For YARN, there are currently these (pretty YARN specific) abstract
> classes:
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
> >
> > I'd suggest that we first merge the "flink-mesos" integration once its
> > done. After that, we can try to come up with a common interface.
> >
> > Are you interested on working towards that feature after the
> "flink-mesos"
> > integration?
> >
> > Best,
> > Robert
> >
> >
> > On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote:
> >
> > > Hi Stephan / others interested,
> > >
> > > I have been working on the flink-mesos integration and there are
> definitely
> > > some thoughts that I would like to share some thoughts about the
> > > commonalities with the flink-yarn integration.
> > >
> > > * Both flink-mesos and flink-yarn integration as they stand today can
> be
> > > considered as "coarse-grained" scheduling integrations. This means
> that the
> > > tasks that are spawned (the task managers) are long-lived.
> > >
> > > * MaGuoWei is referring to something (as correctly identified by
> Stephan)
> > > that I like to call "fine-grained" scheduling integration where, the
> task
> > > managers are relinquished by the framework when they aren't being
> utilised
> > > by Flink. This means that when the next job is executed, the job
> manager
> > > and/or framework will spawn new task managers. This also has an implied
> > > requirement that each taskManager runs one task and is then discarded.
> > >
> > > * Coarse-grained scheduling is preferable when we want interactive
> > > (sub-second response) and waiting for a resource offer to be accepted
> and a
> > > new taskManager JVM spin up time is not acceptable. The downside is
> that
> > > long running tasks means that it may lead to underutilisation of the
> shared
> > > cluster.
> > >
> > > * Fine-grained scheduling is preferable when a little delay (due to
> > > starting
> > > a new taskManager JVM) is acceptable. This means that we will have
> higher
> > > utilisation of the cluster in a shared setting as resources that aren't
> > > being used are relinquished. But, we need to be a lot more extensive
> about
> > > this approach. Some of the cases that I can think of are:
> > >   * The jobManager/integration-framework may need to monitor the
> > > utilisation
> > > of the taskManagers and kill of taskManagers based on some cool-down
> > > timeout.
> > >   * The taskManagers that are being killed off may have resources that
> are
> > > needed but other tasks so they can't always be killed off
> > > (files/intermediate results etc). This means that there needs to be
> some
> > > sort of "are you idle?" handshake that needs to be done.
> > >   * I like "fine-grained" mode but there may need to be a middle ground
> > > where tasks are "coarse-grained" i.e. run multiple operators and once
> idle
> > > for a certain amount of time, they are reaped/killed-off by the
> > > jobManager/integration-framework.
> > >
> > > * Ideally, we would want to isolate the logic (a general scheduler)
> that
> > > says "get me a slot meeting constraints X" into one module which
> utilises
> > > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > > needs of the former. This idea is sort of inspired from the way this
> > > separation exists in apache spark and seems to work out well.
> > >
> > > * I don't know the codebase well enough to say where these things go
> based
> > > on my reading of the overall architecture of the system, there is
> nothing
> > > that can't be satisfied by the flink-runtime and it *should* not need
> any
> > > detailed access to the execution plan. I'll defer this to someone who
> knows
> > > the internals better.
> > >
> > >
> > >
> > > --
> > > View this message in context:
> > >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
> > > Sent from the Apache Flink Mailing List archive. mailing list archive
> at
> > > Nabble.com.
> > >
> >
> >
> > If you reply to this email, your message will be added to the discussion
> below:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7578.html
> > To unsubscribe from add some new api to the scheduler in the job
> manager, click here.
> > NAML
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7581.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Re: add some new api to the scheduler in the job manager

Posted by ankurcha <an...@malloc64.com>.
Hi Robert,

I agree with everything that you and Stephan are saying. I haven't looked 
into the flink codebase and the papers/design docs to comment at a finer
level so maybe that's the first piece of homework that I need to do ( need
pointers for that).

And, yes I would definitely be interested in owning/maintaining/extending the 
flink-mesos integration.

-- Ankur Chauhan

> On 19 Aug 2015, at 13:05, Robert Metzger [via Apache Flink Mailing List archive.] <ml...@n3.nabble.com> wrote:
> 
> Hi, 
> 
> I'm sorry for the late reply, I'm still working through a long email 
> backlog from a one week vacation ;) 
> Thank you for the long reply in this thread. Your observations are very 
> good. 
> I think we should indeed work towards a more fine grained "intra-job" 
> elasticity. Let me comment on some of your statements below ... 
> 
>   * The taskManagers that are being killed off may have resources that are 
> > needed but other tasks so they can't always be killed off 
> > (files/intermediate results etc). This means that there needs to be some 
> > sort of "are you idle?" handshake that needs to be done. 
> 
> 
> I think here we have to distinguish between streaming and batch API jobs 
> here. 
> - For deployed streaming jobs, its usually impossible to take away 
> TaskManagers, because we are working on infinite streams (tasks are never 
> done). The simplest thing we can do is stopping machines where no tasks are 
> deployed to. 
> As Stephan mentioned, dynamic scaling of streaming jobs is certainly 
> something interesting for the future. There, we would need a component 
> which is implementing some sort of scaling policy (for example based on 
> throughput, load or latency). For up or down scaling, we would then 
> redeploy a job. For this feature, we certainly need nicely abstracted APIs 
> for YARN and Mesos to alter the running cluster. 
> - For batch jobs which are usually executed in a pipelined = streaming 
> fashion, we would need to execute them in a batch-fashion. (Otherwise, 
> tasks do not finish one after another)) Flink's runtime has already support 
> for that. With some additional logic, allowing us to recognize when an 
> intermediate dataset has been fully consumed by downstream tasks, we can 
> safely deallocate machines in a Flink cluster. I think such a logic can be 
> implemented in the JobManager's scheduler. 
> 
> * Ideally, we would want to isolate the logic (a general scheduler) that 
> > says "get me a slot meeting constraints X" into one module which utilises 
> > another module (Yarn or Mesos) that takes such a request and satisfies the 
> > needs of the former. This idea is sort of inspired from the way this 
> > separation exists in apache spark and seems to work out well. 
> 
> 
> The JobManager of Flink has a component which is scheduling a job graph in 
> the cluster. I think right now the system assumes that a certain number of 
> machines and processing slots are available. 
> But I it should not be too difficult to have something like "fake" machines 
> and slots there which are allocated on demand as needed (so that you 
> basically give the system an upper limit of resources to allocate) 
> 
> I agree with Stephan, that a first good step for fine-grained elasticity 
> would be a common interface for both YARN and Mesos. 
> For YARN, there are currently these (pretty YARN specific) abstract classes: 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
> 
> I'd suggest that we first merge the "flink-mesos" integration once its 
> done. After that, we can try to come up with a common interface. 
> 
> Are you interested on working towards that feature after the "flink-mesos" 
> integration? 
> 
> Best, 
> Robert 
> 
> 
> On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: 
> 
> > Hi Stephan / others interested, 
> > 
> > I have been working on the flink-mesos integration and there are definitely 
> > some thoughts that I would like to share some thoughts about the 
> > commonalities with the flink-yarn integration. 
> > 
> > * Both flink-mesos and flink-yarn integration as they stand today can be 
> > considered as "coarse-grained" scheduling integrations. This means that the 
> > tasks that are spawned (the task managers) are long-lived. 
> > 
> > * MaGuoWei is referring to something (as correctly identified by Stephan) 
> > that I like to call "fine-grained" scheduling integration where, the task 
> > managers are relinquished by the framework when they aren't being utilised 
> > by Flink. This means that when the next job is executed, the job manager 
> > and/or framework will spawn new task managers. This also has an implied 
> > requirement that each taskManager runs one task and is then discarded. 
> > 
> > * Coarse-grained scheduling is preferable when we want interactive 
> > (sub-second response) and waiting for a resource offer to be accepted and a 
> > new taskManager JVM spin up time is not acceptable. The downside is that 
> > long running tasks means that it may lead to underutilisation of the shared 
> > cluster. 
> > 
> > * Fine-grained scheduling is preferable when a little delay (due to 
> > starting 
> > a new taskManager JVM) is acceptable. This means that we will have higher 
> > utilisation of the cluster in a shared setting as resources that aren't 
> > being used are relinquished. But, we need to be a lot more extensive about 
> > this approach. Some of the cases that I can think of are: 
> >   * The jobManager/integration-framework may need to monitor the 
> > utilisation 
> > of the taskManagers and kill of taskManagers based on some cool-down 
> > timeout. 
> >   * The taskManagers that are being killed off may have resources that are 
> > needed but other tasks so they can't always be killed off 
> > (files/intermediate results etc). This means that there needs to be some 
> > sort of "are you idle?" handshake that needs to be done. 
> >   * I like "fine-grained" mode but there may need to be a middle ground 
> > where tasks are "coarse-grained" i.e. run multiple operators and once idle 
> > for a certain amount of time, they are reaped/killed-off by the 
> > jobManager/integration-framework. 
> > 
> > * Ideally, we would want to isolate the logic (a general scheduler) that 
> > says "get me a slot meeting constraints X" into one module which utilises 
> > another module (Yarn or Mesos) that takes such a request and satisfies the 
> > needs of the former. This idea is sort of inspired from the way this 
> > separation exists in apache spark and seems to work out well. 
> > 
> > * I don't know the codebase well enough to say where these things go based 
> > on my reading of the overall architecture of the system, there is nothing 
> > that can't be satisfied by the flink-runtime and it *should* not need any 
> > detailed access to the execution plan. I'll defer this to someone who knows 
> > the internals better. 
> > 
> > 
> > 
> > -- 
> > View this message in context: 
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at 
> > Nabble.com. 
> > 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7578.html
> To unsubscribe from add some new api to the scheduler in the job manager, click here.
> NAML





--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7581.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: add some new api to the scheduler in the job manager

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
one thing to also keep in mind here is that TaskManagers might have to keep
the intermediate data even after a job is finished. For example, if the
user has an interactive scala-shell session where they are exploring same
data and transforming it in several steps as they go.

Cheers,
Aljoscha

On Wed, 19 Aug 2015 at 22:05 Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> I'm sorry for the late reply, I'm still working through a long email
> backlog from a one week vacation ;)
> Thank you for the long reply in this thread. Your observations are very
> good.
> I think we should indeed work towards a more fine grained "intra-job"
> elasticity. Let me comment on some of your statements below ...
>
>   * The taskManagers that are being killed off may have resources that are
> > needed but other tasks so they can't always be killed off
> > (files/intermediate results etc). This means that there needs to be some
> > sort of "are you idle?" handshake that needs to be done.
>
>
> I think here we have to distinguish between streaming and batch API jobs
> here.
> - For deployed streaming jobs, its usually impossible to take away
> TaskManagers, because we are working on infinite streams (tasks are never
> done). The simplest thing we can do is stopping machines where no tasks are
> deployed to.
> As Stephan mentioned, dynamic scaling of streaming jobs is certainly
> something interesting for the future. There, we would need a component
> which is implementing some sort of scaling policy (for example based on
> throughput, load or latency). For up or down scaling, we would then
> redeploy a job. For this feature, we certainly need nicely abstracted APIs
> for YARN and Mesos to alter the running cluster.
> - For batch jobs which are usually executed in a pipelined = streaming
> fashion, we would need to execute them in a batch-fashion. (Otherwise,
> tasks do not finish one after another)) Flink's runtime has already support
> for that. With some additional logic, allowing us to recognize when an
> intermediate dataset has been fully consumed by downstream tasks, we can
> safely deallocate machines in a Flink cluster. I think such a logic can be
> implemented in the JobManager's scheduler.
>
> * Ideally, we would want to isolate the logic (a general scheduler) that
> > says "get me a slot meeting constraints X" into one module which utilises
> > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > needs of the former. This idea is sort of inspired from the way this
> > separation exists in apache spark and seems to work out well.
>
>
> The JobManager of Flink has a component which is scheduling a job graph in
> the cluster. I think right now the system assumes that a certain number of
> machines and processing slots are available.
> But I it should not be too difficult to have something like "fake" machines
> and slots there which are allocated on demand as needed (so that you
> basically give the system an upper limit of resources to allocate)
>
> I agree with Stephan, that a first good step for fine-grained elasticity
> would be a common interface for both YARN and Mesos.
> For YARN, there are currently these (pretty YARN specific) abstract
> classes:
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
>
> I'd suggest that we first merge the "flink-mesos" integration once its
> done. After that, we can try to come up with a common interface.
>
> Are you interested on working towards that feature after the "flink-mesos"
> integration?
>
> Best,
> Robert
>
>
> On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <an...@malloc64.com> wrote:
>
> > Hi Stephan / others interested,
> >
> > I have been working on the flink-mesos integration and there are
> definitely
> > some thoughts that I would like to share some thoughts about the
> > commonalities with the flink-yarn integration.
> >
> > * Both flink-mesos and flink-yarn integration as they stand today can be
> > considered as "coarse-grained" scheduling integrations. This means that
> the
> > tasks that are spawned (the task managers) are long-lived.
> >
> > * MaGuoWei is referring to something (as correctly identified by Stephan)
> > that I like to call "fine-grained" scheduling integration where, the task
> > managers are relinquished by the framework when they aren't being
> utilised
> > by Flink. This means that when the next job is executed, the job manager
> > and/or framework will spawn new task managers. This also has an implied
> > requirement that each taskManager runs one task and is then discarded.
> >
> > * Coarse-grained scheduling is preferable when we want interactive
> > (sub-second response) and waiting for a resource offer to be accepted
> and a
> > new taskManager JVM spin up time is not acceptable. The downside is that
> > long running tasks means that it may lead to underutilisation of the
> shared
> > cluster.
> >
> > * Fine-grained scheduling is preferable when a little delay (due to
> > starting
> > a new taskManager JVM) is acceptable. This means that we will have higher
> > utilisation of the cluster in a shared setting as resources that aren't
> > being used are relinquished. But, we need to be a lot more extensive
> about
> > this approach. Some of the cases that I can think of are:
> >   * The jobManager/integration-framework may need to monitor the
> > utilisation
> > of the taskManagers and kill of taskManagers based on some cool-down
> > timeout.
> >   * The taskManagers that are being killed off may have resources that
> are
> > needed but other tasks so they can't always be killed off
> > (files/intermediate results etc). This means that there needs to be some
> > sort of "are you idle?" handshake that needs to be done.
> >   * I like "fine-grained" mode but there may need to be a middle ground
> > where tasks are "coarse-grained" i.e. run multiple operators and once
> idle
> > for a certain amount of time, they are reaped/killed-off by the
> > jobManager/integration-framework.
> >
> > * Ideally, we would want to isolate the logic (a general scheduler) that
> > says "get me a slot meeting constraints X" into one module which utilises
> > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > needs of the former. This idea is sort of inspired from the way this
> > separation exists in apache spark and seems to work out well.
> >
> > * I don't know the codebase well enough to say where these things go
> based
> > on my reading of the overall architecture of the system, there is nothing
> > that can't be satisfied by the flink-runtime and it *should* not need any
> > detailed access to the execution plan. I'll defer this to someone who
> knows
> > the internals better.
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>

Re: add some new api to the scheduler in the job manager

Posted by Robert Metzger <rm...@apache.org>.
Hi,

I'm sorry for the late reply, I'm still working through a long email
backlog from a one week vacation ;)
Thank you for the long reply in this thread. Your observations are very
good.
I think we should indeed work towards a more fine grained "intra-job"
elasticity. Let me comment on some of your statements below ...

  * The taskManagers that are being killed off may have resources that are
> needed but other tasks so they can't always be killed off
> (files/intermediate results etc). This means that there needs to be some
> sort of "are you idle?" handshake that needs to be done.


I think here we have to distinguish between streaming and batch API jobs
here.
- For deployed streaming jobs, its usually impossible to take away
TaskManagers, because we are working on infinite streams (tasks are never
done). The simplest thing we can do is stopping machines where no tasks are
deployed to.
As Stephan mentioned, dynamic scaling of streaming jobs is certainly
something interesting for the future. There, we would need a component
which is implementing some sort of scaling policy (for example based on
throughput, load or latency). For up or down scaling, we would then
redeploy a job. For this feature, we certainly need nicely abstracted APIs
for YARN and Mesos to alter the running cluster.
- For batch jobs which are usually executed in a pipelined = streaming
fashion, we would need to execute them in a batch-fashion. (Otherwise,
tasks do not finish one after another)) Flink's runtime has already support
for that. With some additional logic, allowing us to recognize when an
intermediate dataset has been fully consumed by downstream tasks, we can
safely deallocate machines in a Flink cluster. I think such a logic can be
implemented in the JobManager's scheduler.

* Ideally, we would want to isolate the logic (a general scheduler) that
> says "get me a slot meeting constraints X" into one module which utilises
> another module (Yarn or Mesos) that takes such a request and satisfies the
> needs of the former. This idea is sort of inspired from the way this
> separation exists in apache spark and seems to work out well.


The JobManager of Flink has a component which is scheduling a job graph in
the cluster. I think right now the system assumes that a certain number of
machines and processing slots are available.
But I it should not be too difficult to have something like "fake" machines
and slots there which are allocated on demand as needed (so that you
basically give the system an upper limit of resources to allocate)

I agree with Stephan, that a first good step for fine-grained elasticity
would be a common interface for both YARN and Mesos.
For YARN, there are currently these (pretty YARN specific) abstract classes:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java

I'd suggest that we first merge the "flink-mesos" integration once its
done. After that, we can try to come up with a common interface.

Are you interested on working towards that feature after the "flink-mesos"
integration?

Best,
Robert


On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <an...@malloc64.com> wrote:

> Hi Stephan / others interested,
>
> I have been working on the flink-mesos integration and there are definitely
> some thoughts that I would like to share some thoughts about the
> commonalities with the flink-yarn integration.
>
> * Both flink-mesos and flink-yarn integration as they stand today can be
> considered as "coarse-grained" scheduling integrations. This means that the
> tasks that are spawned (the task managers) are long-lived.
>
> * MaGuoWei is referring to something (as correctly identified by Stephan)
> that I like to call "fine-grained" scheduling integration where, the task
> managers are relinquished by the framework when they aren't being utilised
> by Flink. This means that when the next job is executed, the job manager
> and/or framework will spawn new task managers. This also has an implied
> requirement that each taskManager runs one task and is then discarded.
>
> * Coarse-grained scheduling is preferable when we want interactive
> (sub-second response) and waiting for a resource offer to be accepted and a
> new taskManager JVM spin up time is not acceptable. The downside is that
> long running tasks means that it may lead to underutilisation of the shared
> cluster.
>
> * Fine-grained scheduling is preferable when a little delay (due to
> starting
> a new taskManager JVM) is acceptable. This means that we will have higher
> utilisation of the cluster in a shared setting as resources that aren't
> being used are relinquished. But, we need to be a lot more extensive about
> this approach. Some of the cases that I can think of are:
>   * The jobManager/integration-framework may need to monitor the
> utilisation
> of the taskManagers and kill of taskManagers based on some cool-down
> timeout.
>   * The taskManagers that are being killed off may have resources that are
> needed but other tasks so they can't always be killed off
> (files/intermediate results etc). This means that there needs to be some
> sort of "are you idle?" handshake that needs to be done.
>   * I like "fine-grained" mode but there may need to be a middle ground
> where tasks are "coarse-grained" i.e. run multiple operators and once idle
> for a certain amount of time, they are reaped/killed-off by the
> jobManager/integration-framework.
>
> * Ideally, we would want to isolate the logic (a general scheduler) that
> says "get me a slot meeting constraints X" into one module which utilises
> another module (Yarn or Mesos) that takes such a request and satisfies the
> needs of the former. This idea is sort of inspired from the way this
> separation exists in apache spark and seems to work out well.
>
> * I don't know the codebase well enough to say where these things go based
> on my reading of the overall architecture of the system, there is nothing
> that can't be satisfied by the flink-runtime and it *should* not need any
> detailed access to the execution plan. I'll defer this to someone who knows
> the internals better.
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Re: add some new api to the scheduler in the job manager

Posted by ankurcha <an...@malloc64.com>.
Hi Stephan / others interested,

I have been working on the flink-mesos integration and there are definitely
some thoughts that I would like to share some thoughts about the
commonalities with the flink-yarn integration. 

* Both flink-mesos and flink-yarn integration as they stand today can be
considered as "coarse-grained" scheduling integrations. This means that the
tasks that are spawned (the task managers) are long-lived.

* MaGuoWei is referring to something (as correctly identified by Stephan)
that I like to call "fine-grained" scheduling integration where, the task
managers are relinquished by the framework when they aren't being utilised
by Flink. This means that when the next job is executed, the job manager
and/or framework will spawn new task managers. This also has an implied
requirement that each taskManager runs one task and is then discarded.

* Coarse-grained scheduling is preferable when we want interactive
(sub-second response) and waiting for a resource offer to be accepted and a
new taskManager JVM spin up time is not acceptable. The downside is that
long running tasks means that it may lead to underutilisation of the shared
cluster.

* Fine-grained scheduling is preferable when a little delay (due to starting
a new taskManager JVM) is acceptable. This means that we will have higher
utilisation of the cluster in a shared setting as resources that aren't
being used are relinquished. But, we need to be a lot more extensive about
this approach. Some of the cases that I can think of are:
  * The jobManager/integration-framework may need to monitor the utilisation
of the taskManagers and kill of taskManagers based on some cool-down
timeout.
  * The taskManagers that are being killed off may have resources that are
needed but other tasks so they can't always be killed off
(files/intermediate results etc). This means that there needs to be some
sort of "are you idle?" handshake that needs to be done.
  * I like "fine-grained" mode but there may need to be a middle ground
where tasks are "coarse-grained" i.e. run multiple operators and once idle
for a certain amount of time, they are reaped/killed-off by the
jobManager/integration-framework.

* Ideally, we would want to isolate the logic (a general scheduler) that
says "get me a slot meeting constraints X" into one module which utilises
another module (Yarn or Mesos) that takes such a request and satisfies the
needs of the former. This idea is sort of inspired from the way this
separation exists in apache spark and seems to work out well.

* I don't know the codebase well enough to say where these things go based
on my reading of the overall architecture of the system, there is nothing
that can't be satisfied by the flink-runtime and it *should* not need any
detailed access to the execution plan. I'll defer this to someone who knows
the internals better.



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: add some new api to the scheduler in the job manager

Posted by Stephan Ewen <se...@apache.org>.
To properly abstract the resource manager behind an interface, it would be
good to see what the common overlap of the YARN integration, and the
pending mesos integration is.

Can anyone working on this chime in?



On Tue, Jul 28, 2015 at 1:08 PM, Stephan Ewen <se...@apache.org> wrote:

> I think what MaGuoWei is looking for is "intra-job" YARN elasticity.
>
> Adding these hooks is going to be important also to trigger streaming jobs
> to scale in/out during runtime.
>
> On Tue, Jul 28, 2015 at 3:27 AM, MaGuoWei <ma...@outlook.com> wrote:
>
>> thanks all you guys.Now I know I can achieve this goal by creating a
>> cluster per topology and estimating the cluster size by analyzing the
>> JobGraph(or by some user input).But I think it can be more beautiful  if
>> Flink can abstract some common resource api such as get/release/deploy. :-)
>> thanks.
>>
>>
>>
>> > From: rmetzger@apache.org
>> > Date: Mon, 27 Jul 2015 14:01:11 +0200
>> > Subject: Re: add some new api to the scheduler in the job manager
>> > To: dev@flink.apache.org
>> >
>> > Hi MaGuoWei,
>> >
>> > would you like to have done automatically by Flink or based on some user
>> > input?
>> > Adding commands to the ./bin/yarn-session.sh to change the cluster size
>> is
>> > quite easy. However, reducing the cluster size while a job is running
>> will
>> > fail the job.
>> >
>> > Making this automatically is much harder.
>> >
>> > On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <mx...@apache.org>
>> wrote:
>> >
>> > > Hi MaGuoWei,
>> > >
>> > > If I understand correctly, you're are looking for a way to have a job
>> > > manager (master) contentiously running which requests YARN containers
>> for
>> > > the task managers (workers) on the fly. This is currently not
>> supported by
>> > > Flink although you can add or remove task managers while the cluster
>> is
>> > > running. The job manager distributes task to the task manager which
>> are
>> > > available at scheduling time.
>> > >
>> > > As Aljoscha mentioned, the most flexible way of sharing resources in
>> a YARN
>> > > environment is to start a per-job cluster for each job.
>> > >
>> > > Cheers,
>> > > Max
>> > >
>> > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <ma...@outlook.com>
>> wrote:
>> > >
>> > > > It is great! Is there any document? I am very interested in this.
>> > > > thanks
>> > > >
>> > > >
>> > > > > From: aljoscha@apache.org
>> > > > > Date: Mon, 27 Jul 2015 05:14:00 +0000
>> > > > > Subject: Re: add some new api to the scheduler in the job manager
>> > > > > To: dev@flink.apache.org
>> > > > >
>> > > > > Hi,
>> > > > > I think for more details on giving back resources of a running
>> cluster
>> > > we
>> > > > > have to wait for Robert's opinion. In the mean time, you can also
>> just
>> > > > run
>> > > > > a single job that will bring up some yarn containers and then
>> release
>> > > > them
>> > > > > afterward using this:
>> > > > >
>> > > >
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
>> > > > >
>> > > > > Cheers,
>> > > > > Aljoscha
>> > > > >
>> > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com>
>> wrote:
>> > > > >
>> > > > > > hi guysFlink must take over all the resources all the time. That
>> > > wastes
>> > > > > > resources sometime especially  in the sharing cluster.For
>> example
>> > > when
>> > > > > > using Flink on Yarn the resource can't be returned back to RM
>> even if
>> > > > no
>> > > > > > graph is running.So I want to know: Is it possible to add some
>> apis
>> > > in
>> > > > the
>> > > > > > scheduler to request the resource(slot) and release the
>> > > resource(slot)?
>> > > > > > These apis can be customized in the different circumstance.
>> > > > > > thanks a lot!
>> > > > > >
>> > > > > >
>> > > > > >
>> > > >
>> > > >
>> > >
>>
>>
>
>

Re: add some new api to the scheduler in the job manager

Posted by Stephan Ewen <se...@apache.org>.
I think what MaGuoWei is looking for is "intra-job" YARN elasticity.

Adding these hooks is going to be important also to trigger streaming jobs
to scale in/out during runtime.

On Tue, Jul 28, 2015 at 3:27 AM, MaGuoWei <ma...@outlook.com> wrote:

> thanks all you guys.Now I know I can achieve this goal by creating a
> cluster per topology and estimating the cluster size by analyzing the
> JobGraph(or by some user input).But I think it can be more beautiful  if
> Flink can abstract some common resource api such as get/release/deploy. :-)
> thanks.
>
>
>
> > From: rmetzger@apache.org
> > Date: Mon, 27 Jul 2015 14:01:11 +0200
> > Subject: Re: add some new api to the scheduler in the job manager
> > To: dev@flink.apache.org
> >
> > Hi MaGuoWei,
> >
> > would you like to have done automatically by Flink or based on some user
> > input?
> > Adding commands to the ./bin/yarn-session.sh to change the cluster size
> is
> > quite easy. However, reducing the cluster size while a job is running
> will
> > fail the job.
> >
> > Making this automatically is much harder.
> >
> > On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <mx...@apache.org>
> wrote:
> >
> > > Hi MaGuoWei,
> > >
> > > If I understand correctly, you're are looking for a way to have a job
> > > manager (master) contentiously running which requests YARN containers
> for
> > > the task managers (workers) on the fly. This is currently not
> supported by
> > > Flink although you can add or remove task managers while the cluster is
> > > running. The job manager distributes task to the task manager which are
> > > available at scheduling time.
> > >
> > > As Aljoscha mentioned, the most flexible way of sharing resources in a
> YARN
> > > environment is to start a per-job cluster for each job.
> > >
> > > Cheers,
> > > Max
> > >
> > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <ma...@outlook.com>
> wrote:
> > >
> > > > It is great! Is there any document? I am very interested in this.
> > > > thanks
> > > >
> > > >
> > > > > From: aljoscha@apache.org
> > > > > Date: Mon, 27 Jul 2015 05:14:00 +0000
> > > > > Subject: Re: add some new api to the scheduler in the job manager
> > > > > To: dev@flink.apache.org
> > > > >
> > > > > Hi,
> > > > > I think for more details on giving back resources of a running
> cluster
> > > we
> > > > > have to wait for Robert's opinion. In the mean time, you can also
> just
> > > > run
> > > > > a single job that will bring up some yarn containers and then
> release
> > > > them
> > > > > afterward using this:
> > > > >
> > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com>
> wrote:
> > > > >
> > > > > > hi guysFlink must take over all the resources all the time. That
> > > wastes
> > > > > > resources sometime especially  in the sharing cluster.For example
> > > when
> > > > > > using Flink on Yarn the resource can't be returned back to RM
> even if
> > > > no
> > > > > > graph is running.So I want to know: Is it possible to add some
> apis
> > > in
> > > > the
> > > > > > scheduler to request the resource(slot) and release the
> > > resource(slot)?
> > > > > > These apis can be customized in the different circumstance.
> > > > > > thanks a lot!
> > > > > >
> > > > > >
> > > > > >
> > > >
> > > >
> > >
>
>

RE: add some new api to the scheduler in the job manager

Posted by MaGuoWei <ma...@outlook.com>.
thanks all you guys.Now I know I can achieve this goal by creating a cluster per topology and estimating the cluster size by analyzing the JobGraph(or by some user input).But I think it can be more beautiful  if Flink can abstract some common resource api such as get/release/deploy. :-)
thanks.



> From: rmetzger@apache.org
> Date: Mon, 27 Jul 2015 14:01:11 +0200
> Subject: Re: add some new api to the scheduler in the job manager
> To: dev@flink.apache.org
> 
> Hi MaGuoWei,
> 
> would you like to have done automatically by Flink or based on some user
> input?
> Adding commands to the ./bin/yarn-session.sh to change the cluster size is
> quite easy. However, reducing the cluster size while a job is running will
> fail the job.
> 
> Making this automatically is much harder.
> 
> On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <mx...@apache.org> wrote:
> 
> > Hi MaGuoWei,
> >
> > If I understand correctly, you're are looking for a way to have a job
> > manager (master) contentiously running which requests YARN containers for
> > the task managers (workers) on the fly. This is currently not supported by
> > Flink although you can add or remove task managers while the cluster is
> > running. The job manager distributes task to the task manager which are
> > available at scheduling time.
> >
> > As Aljoscha mentioned, the most flexible way of sharing resources in a YARN
> > environment is to start a per-job cluster for each job.
> >
> > Cheers,
> > Max
> >
> > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <ma...@outlook.com> wrote:
> >
> > > It is great! Is there any document? I am very interested in this.
> > > thanks
> > >
> > >
> > > > From: aljoscha@apache.org
> > > > Date: Mon, 27 Jul 2015 05:14:00 +0000
> > > > Subject: Re: add some new api to the scheduler in the job manager
> > > > To: dev@flink.apache.org
> > > >
> > > > Hi,
> > > > I think for more details on giving back resources of a running cluster
> > we
> > > > have to wait for Robert's opinion. In the mean time, you can also just
> > > run
> > > > a single job that will bring up some yarn containers and then release
> > > them
> > > > afterward using this:
> > > >
> > >
> > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com> wrote:
> > > >
> > > > > hi guysFlink must take over all the resources all the time. That
> > wastes
> > > > > resources sometime especially  in the sharing cluster.For example
> > when
> > > > > using Flink on Yarn the resource can't be returned back to RM even if
> > > no
> > > > > graph is running.So I want to know: Is it possible to add some apis
> > in
> > > the
> > > > > scheduler to request the resource(slot) and release the
> > resource(slot)?
> > > > > These apis can be customized in the different circumstance.
> > > > > thanks a lot!
> > > > >
> > > > >
> > > > >
> > >
> > >
> >
 		 	   		  

Re: add some new api to the scheduler in the job manager

Posted by Robert Metzger <rm...@apache.org>.
Hi MaGuoWei,

would you like to have done automatically by Flink or based on some user
input?
Adding commands to the ./bin/yarn-session.sh to change the cluster size is
quite easy. However, reducing the cluster size while a job is running will
fail the job.

Making this automatically is much harder.

On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi MaGuoWei,
>
> If I understand correctly, you're are looking for a way to have a job
> manager (master) contentiously running which requests YARN containers for
> the task managers (workers) on the fly. This is currently not supported by
> Flink although you can add or remove task managers while the cluster is
> running. The job manager distributes task to the task manager which are
> available at scheduling time.
>
> As Aljoscha mentioned, the most flexible way of sharing resources in a YARN
> environment is to start a per-job cluster for each job.
>
> Cheers,
> Max
>
> On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <ma...@outlook.com> wrote:
>
> > It is great! Is there any document? I am very interested in this.
> > thanks
> >
> >
> > > From: aljoscha@apache.org
> > > Date: Mon, 27 Jul 2015 05:14:00 +0000
> > > Subject: Re: add some new api to the scheduler in the job manager
> > > To: dev@flink.apache.org
> > >
> > > Hi,
> > > I think for more details on giving back resources of a running cluster
> we
> > > have to wait for Robert's opinion. In the mean time, you can also just
> > run
> > > a single job that will bring up some yarn containers and then release
> > them
> > > afterward using this:
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com> wrote:
> > >
> > > > hi guysFlink must take over all the resources all the time. That
> wastes
> > > > resources sometime especially  in the sharing cluster.For example
> when
> > > > using Flink on Yarn the resource can't be returned back to RM even if
> > no
> > > > graph is running.So I want to know: Is it possible to add some apis
> in
> > the
> > > > scheduler to request the resource(slot) and release the
> resource(slot)?
> > > > These apis can be customized in the different circumstance.
> > > > thanks a lot!
> > > >
> > > >
> > > >
> >
> >
>

Re: add some new api to the scheduler in the job manager

Posted by Maximilian Michels <mx...@apache.org>.
Hi MaGuoWei,

If I understand correctly, you're are looking for a way to have a job
manager (master) contentiously running which requests YARN containers for
the task managers (workers) on the fly. This is currently not supported by
Flink although you can add or remove task managers while the cluster is
running. The job manager distributes task to the task manager which are
available at scheduling time.

As Aljoscha mentioned, the most flexible way of sharing resources in a YARN
environment is to start a per-job cluster for each job.

Cheers,
Max

On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <ma...@outlook.com> wrote:

> It is great! Is there any document? I am very interested in this.
> thanks
>
>
> > From: aljoscha@apache.org
> > Date: Mon, 27 Jul 2015 05:14:00 +0000
> > Subject: Re: add some new api to the scheduler in the job manager
> > To: dev@flink.apache.org
> >
> > Hi,
> > I think for more details on giving back resources of a running cluster we
> > have to wait for Robert's opinion. In the mean time, you can also just
> run
> > a single job that will bring up some yarn containers and then release
> them
> > afterward using this:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
> >
> > Cheers,
> > Aljoscha
> >
> > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com> wrote:
> >
> > > hi guysFlink must take over all the resources all the time. That wastes
> > > resources sometime especially  in the sharing cluster.For example when
> > > using Flink on Yarn the resource can't be returned back to RM even if
> no
> > > graph is running.So I want to know: Is it possible to add some apis in
> the
> > > scheduler to request the resource(slot) and release the resource(slot)?
> > > These apis can be customized in the different circumstance.
> > > thanks a lot!
> > >
> > >
> > >
>
>

RE: add some new api to the scheduler in the job manager

Posted by MaGuoWei <ma...@outlook.com>.
It is great! Is there any document? I am very interested in this.
thanks


> From: aljoscha@apache.org
> Date: Mon, 27 Jul 2015 05:14:00 +0000
> Subject: Re: add some new api to the scheduler in the job manager
> To: dev@flink.apache.org
> 
> Hi,
> I think for more details on giving back resources of a running cluster we
> have to wait for Robert's opinion. In the mean time, you can also just run
> a single job that will bring up some yarn containers and then release them
> afterward using this:
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn
> 
> Cheers,
> Aljoscha
> 
> On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com> wrote:
> 
> > hi guysFlink must take over all the resources all the time. That wastes
> > resources sometime especially  in the sharing cluster.For example when
> > using Flink on Yarn the resource can't be returned back to RM even if  no
> > graph is running.So I want to know: Is it possible to add some apis in the
> > scheduler to request the resource(slot) and release the resource(slot)?
> > These apis can be customized in the different circumstance.
> > thanks a lot!
> >
> >
> >
 		 	   		  

Re: add some new api to the scheduler in the job manager

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think for more details on giving back resources of a running cluster we
have to wait for Robert's opinion. In the mean time, you can also just run
a single job that will bring up some yarn containers and then release them
afterward using this:
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn

Cheers,
Aljoscha

On Mon, 27 Jul 2015 at 03:51 MaGuoWei <ma...@outlook.com> wrote:

> hi guysFlink must take over all the resources all the time. That wastes
> resources sometime especially  in the sharing cluster.For example when
> using Flink on Yarn the resource can't be returned back to RM even if  no
> graph is running.So I want to know: Is it possible to add some apis in the
> scheduler to request the resource(slot) and release the resource(slot)?
> These apis can be customized in the different circumstance.
> thanks a lot!
>
>
>