You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward Rojas <ed...@gmail.com> on 2018/03/15 18:53:39 UTC

Migration to Flip6 Kubernetes

Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready. 

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.
 
I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed. 

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Migration to Flip6 Kubernetes

Posted by Till Rohrmann <tr...@apache.org>.
Hi Edward and Eron,

you're right that there is currently no JobClusterEntrypoint implementation
for Kubernetes. How this entrypoint looks like mostly depends on how the
job is stored and retrieved. There are multiple ways conceivable:

- The entrypoint connects to an external system from which it fetches the
JobGraph
- The entrypoint contains the serialized JobGraph similar to how the
YarnJobClusterEntrypoint works, but this would mean that you have a
separate image per job
- The entrypoint actually executes a user jar which generates the JobGraph
similar to what happens on the client when you submit a job

I'm not a Kubernetes expert and therefore I don't know what's the most
idiomatic approach to it. But once we have figured this out, it should not
be too difficult to write the Kubernetes JobClusterEntrypoint.

If we say that Kubernetes is responsible for assigning new resources, then
we need a special KubernetesResourceManager which automatically assigns all
registered slots to the single JobMaster. This JobMaster would then accept
all slots and scale the job to how many slots it got offered. That way we
could easily let K8 control the resources.

If there is a way to communicate with K8 from within Flink, then we could
also implement a mode which is similar to Flink's Yarn integration. The
K8RM would then ask for new pods to be started if the JM needs more slots.

The per-job mode on K8 won't unfortunately make it into Flink 1.5. But I'm
confident that the community will address this issue with Flink 1.6.

Cheers,
Till


On Wed, Mar 21, 2018 at 4:08 PM, Eron Wright <er...@gmail.com> wrote:

> It would be helpful to expand on how, in job mode, the job graph would be
> produced.  The phrase 'which contains the single job you want to execute'
> has a few meanings; I believe Till means a serialized job graph, not an
> executable JAR w/ main method.  Till is that correct?
>
> On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Edward,
>>
>> you're right that Flink's Kubernetes documentation has not been updated
>> with respect to Flip-6. This will be one of the tasks during the Flink 1.5
>> release testing and is still pending.
>>
>> A Flink cluster can be run in two modes: session mode vs per-job mode.
>> The former starts a cluster to which you can submit multiple jobs. The
>> cluster shares the same ResourceManager and a Dispatcher which is
>> responsible for spawning JobMasters which execute a single job each. The
>> latter starts a Flink cluster which is pre-initialized with a JobGraph and
>> only runs this job. Here we also start a ResourceManager and a
>> MiniDispatcher whose job it is to simply start a single JobMaster with the
>> pre-initialized JobGraph.
>>
>> StandaloneSessionClusterEntrypoint is the entrypoint for the session
>> mode.
>>
>> The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a
>> look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the
>> JobGraph from HDFS and then automatically starts executing it. There is no
>> script which directly starts this entrypoint, but the YarnClusterDescriptor
>> uses it when `deployJobCluster` is called.
>>
>> Depending on what you want to achieve: Either building generic K8 images
>> to which you can submit any number of Flink jobs or having a special image
>> which contains the single job you want to exeucte, you either have to call
>> into the SessionClusterEntrypoint or the JobClusterEntrypoint. When
>> starting a session cluster, then you can use bin/flink run to submit a job
>> to this cluster.
>>
>> Let me know if you have other questions.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <ed...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Currently I have a Flink 1.4 cluster running on kubernetes based on the
>>> configuration describe on
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> ops/deployment/kubernetes.html
>>> with additional config for HA with Zookeeper.
>>>
>>> With this I have several Taskmanagers, a single Jobmanager and I create a
>>> container for each job to perform the Job submission and manage Job
>>> updates
>>> with savepoints.
>>>
>>>
>>> I'm looking into what would be needed to migrate to the new architecture
>>> on
>>> FLIP6 as we are planning to use Flink 1.5 once it's ready.
>>>
>>> If I understand correctly from
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pa
>>> geId=65147077
>>> and the current code on master:
>>>
>>> * Taskmanagers would continue the same, i.e they will execute the
>>> taskmanager.sh start-foreground  script, which with the flip6 mode
>>> activated
>>> will execute the new taskexecutor.TaskManagerRunner.
>>>
>>> * We will have now one Job Manager per Job which is really good; but I
>>> don't
>>> fully understand how this would be started.
>>>
>>> I notice that the jobmanager.sh with flip6 mode activated will execute
>>> entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we
>>> could
>>> pass the job jar and parameters (?)
>>>
>>> So I think the other possibility to start the job would be via the /flink
>>> run/ command with maybe an option to tell that we are creating a job with
>>> job manager or would be this the default behaviour ?
>>>
>>> Or would be this the role of the JobMaster ? I didn't take a look to its
>>> code but it's mentioned on the flip6 page. (however I don't see an
>>> entrypoint from the scripts (?))
>>>
>>> Could you help me to understand how this is expected to be done ?
>>>
>>>
>>> * Also I'm not sure to understand whether it would be better to have a
>>> ResourceManager per job or a single ResourceManager per cluster, as in
>>> the
>>> page is stated that there is a ResourceManager for
>>> Self-contained-single-job, but it seems to me that it needs to have the
>>> information about all JobManagers and TaskManagers (?)
>>>
>>>
>>> Thanks in advance for the help you could provide.
>>>
>>> I'm interested in using Flip6 on kubernetes when it will be ready, so I
>>> could help with some testing if needed.
>>>
>>> --
>>> Edward
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>

Re: Migration to Flip6 Kubernetes

Posted by Eron Wright <er...@gmail.com>.
It would be helpful to expand on how, in job mode, the job graph would be
produced.  The phrase 'which contains the single job you want to execute'
has a few meanings; I believe Till means a serialized job graph, not an
executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Edward,
>
> you're right that Flink's Kubernetes documentation has not been updated
> with respect to Flip-6. This will be one of the tasks during the Flink 1.5
> release testing and is still pending.
>
> A Flink cluster can be run in two modes: session mode vs per-job mode. The
> former starts a cluster to which you can submit multiple jobs. The cluster
> shares the same ResourceManager and a Dispatcher which is responsible for
> spawning JobMasters which execute a single job each. The latter starts a
> Flink cluster which is pre-initialized with a JobGraph and only runs this
> job. Here we also start a ResourceManager and a MiniDispatcher whose job it
> is to simply start a single JobMaster with the pre-initialized JobGraph.
>
> StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.
>
> The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a
> look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the
> JobGraph from HDFS and then automatically starts executing it. There is no
> script which directly starts this entrypoint, but the YarnClusterDescriptor
> uses it when `deployJobCluster` is called.
>
> Depending on what you want to achieve: Either building generic K8 images
> to which you can submit any number of Flink jobs or having a special image
> which contains the single job you want to exeucte, you either have to call
> into the SessionClusterEntrypoint or the JobClusterEntrypoint. When
> starting a session cluster, then you can use bin/flink run to submit a job
> to this cluster.
>
> Let me know if you have other questions.
>
> Cheers,
> Till
>
> On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <ed...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Currently I have a Flink 1.4 cluster running on kubernetes based on the
>> configuration describe on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> ops/deployment/kubernetes.html
>> with additional config for HA with Zookeeper.
>>
>> With this I have several Taskmanagers, a single Jobmanager and I create a
>> container for each job to perform the Job submission and manage Job
>> updates
>> with savepoints.
>>
>>
>> I'm looking into what would be needed to migrate to the new architecture
>> on
>> FLIP6 as we are planning to use Flink 1.5 once it's ready.
>>
>> If I understand correctly from
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> and the current code on master:
>>
>> * Taskmanagers would continue the same, i.e they will execute the
>> taskmanager.sh start-foreground  script, which with the flip6 mode
>> activated
>> will execute the new taskexecutor.TaskManagerRunner.
>>
>> * We will have now one Job Manager per Job which is really good; but I
>> don't
>> fully understand how this would be started.
>>
>> I notice that the jobmanager.sh with flip6 mode activated will execute
>> entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we
>> could
>> pass the job jar and parameters (?)
>>
>> So I think the other possibility to start the job would be via the /flink
>> run/ command with maybe an option to tell that we are creating a job with
>> job manager or would be this the default behaviour ?
>>
>> Or would be this the role of the JobMaster ? I didn't take a look to its
>> code but it's mentioned on the flip6 page. (however I don't see an
>> entrypoint from the scripts (?))
>>
>> Could you help me to understand how this is expected to be done ?
>>
>>
>> * Also I'm not sure to understand whether it would be better to have a
>> ResourceManager per job or a single ResourceManager per cluster, as in the
>> page is stated that there is a ResourceManager for
>> Self-contained-single-job, but it seems to me that it needs to have the
>> information about all JobManagers and TaskManagers (?)
>>
>>
>> Thanks in advance for the help you could provide.
>>
>> I'm interested in using Flip6 on kubernetes when it will be ready, so I
>> could help with some testing if needed.
>>
>> --
>> Edward
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: Migration to Flip6 Kubernetes

Posted by Edward Rojas <ed...@gmail.com>.
Hi Till,

Thanks for the information. We are using the session cluster and is working
quite good, but we would like to benefit from the new approach of per-job
mode in order to have a better control over the jobs that are running on the
cluster. 

I took a look to the YarnJobClusterEntrypoint and I see now how this planned
to be done, but if I understand correctly, in the current state there is not
possible to start a Job cluster on kubernetes as there is only concrete
implementation for Yarn and mesos?

The objective being to have a Flink cluster running on per-job mode and able
to execute several self-contained jobs, I imagine the idea would be also to
have a Kubernetes specific implementation of the ResourceManager that would
be initialized along the TaskManagers and would be listening for the
"self-contained jobs" to join, assign resources and start the execution of
the specific job, each one with its own JobManager?

Is my understanding correct? 
Is the per-job mode on kubernetes planned to be included on 1.5 ?

Regards,
Edward




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Migration to Flip6 Kubernetes

Posted by Till Rohrmann <tr...@apache.org>.
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated
with respect to Flip-6. This will be one of the tasks during the Flink 1.5
release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The
former starts a cluster to which you can submit multiple jobs. The cluster
shares the same ResourceManager and a Dispatcher which is responsible for
spawning JobMasters which execute a single job each. The latter starts a
Flink cluster which is pre-initialized with a JobGraph and only runs this
job. Here we also start a ResourceManager and a MiniDispatcher whose job it
is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a
look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the
JobGraph from HDFS and then automatically starts executing it. There is no
script which directly starts this entrypoint, but the YarnClusterDescriptor
uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to
which you can submit any number of Flink jobs or having a special image
which contains the single job you want to exeucte, you either have to call
into the SessionClusterEntrypoint or the JobClusterEntrypoint. When
starting a session cluster, then you can use bin/flink run to submit a job
to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <ed...@gmail.com>
wrote:

> Hello,
>
> Currently I have a Flink 1.4 cluster running on kubernetes based on the
> configuration describe on
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/kubernetes.html
> with additional config for HA with Zookeeper.
>
> With this I have several Taskmanagers, a single Jobmanager and I create a
> container for each job to perform the Job submission and manage Job updates
> with savepoints.
>
>
> I'm looking into what would be needed to migrate to the new architecture on
> FLIP6 as we are planning to use Flink 1.5 once it's ready.
>
> If I understand correctly from
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> and the current code on master:
>
> * Taskmanagers would continue the same, i.e they will execute the
> taskmanager.sh start-foreground  script, which with the flip6 mode
> activated
> will execute the new taskexecutor.TaskManagerRunner.
>
> * We will have now one Job Manager per Job which is really good; but I
> don't
> fully understand how this would be started.
>
> I notice that the jobmanager.sh with flip6 mode activated will execute
> entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
> pass the job jar and parameters (?)
>
> So I think the other possibility to start the job would be via the /flink
> run/ command with maybe an option to tell that we are creating a job with
> job manager or would be this the default behaviour ?
>
> Or would be this the role of the JobMaster ? I didn't take a look to its
> code but it's mentioned on the flip6 page. (however I don't see an
> entrypoint from the scripts (?))
>
> Could you help me to understand how this is expected to be done ?
>
>
> * Also I'm not sure to understand whether it would be better to have a
> ResourceManager per job or a single ResourceManager per cluster, as in the
> page is stated that there is a ResourceManager for
> Self-contained-single-job, but it seems to me that it needs to have the
> information about all JobManagers and TaskManagers (?)
>
>
> Thanks in advance for the help you could provide.
>
> I'm interested in using Flip6 on kubernetes when it will be ready, so I
> could help with some testing if needed.
>
> --
> Edward
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>