You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by James Bucher <jb...@expedia.com> on 2017/04/20 16:20:11 UTC

Idempotent Job Submission

Hey all,

I have been doing some digging to see if there is a good way to do an idempotent job submission. I was hoping to write a job submission agent that does the following:

  1.  Checks to see if the cluster is running yet (can contact a JobManager)
  2.  Checks to see if the job it is watching is running.
  3.  Submits the job if it is not yet running.
  4.  Retry if there are any issues.

Specifically at the moment there doesn’t seem to be any functionality for submitting a job if it doesn’t exist. The current interface creates a situation where a race condition is possible (as far as I can tell):

For example if the following sequence of events occurs:

  1.  JobManager fails and a new Leader is re-elected:
     *   JobManager Asynchronously starts restoring jobs: here<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L300>
  2.  Client Calls to list currently running jobs (before jobs are restored) and gets back an incomplete list of running jobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1009> because SubmitJob registers jobs in currentJobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1300>
  3.  Client Assumes Job is no longer running so uses HTTP/CLI/Whatever to restore job.
  4.  Current interfaces don’t pass in the same JobID (a new one is generated for each submit) so a new Job is submitted with a new JobID
  5.  JobManager restores previous instance of the running Job
  6.  Now there are 2 instances of the job running in the cluster.

While the above state is pretty unlikely to hit when one is submitting jobs manually, it seems to me that an agent like the above might end up hitting it if the cluster was having trouble with JobManagers failing.

I can see that FLIP-6<https://issues.apache.org/jira/browse/FLINK-4319> is rewriting the whole JobManager itself. From my reading of the current code base this work is 1/2 way done in master.

From my reading of the code/docs it seems that from the submission side the expectation for Docker/Kubernetes is that you will create two sets of containers:

  1.  A JobMaster/ResourceManager container that contains the user’s job in some form (jar or as a serialized JobGraph).
  2.  A TaskManager container which is either generic or potentially has user libs (up to the implementer/cluster maintainer)

As I currently understand the code the JobMaster instances will:

  1.  Start up a JobMasterRunner which connects to the Leader service and creates a JobMaster with the supplied JobGraph (which I assume will always have the same JobID for restore purposes).
  2.  When the node is granted leadership the JobMasterRunner starts the JobMaster which will schedule the ExecutionGraph which it created from the supplied JobGraph.

This all seems fine for a new job submission but the since the restore logic is not yet implemented I am wondering what the way that people will interact with clusters for job submission. From this doc<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> it appears that the current JobManager infrastructure will instead become a “FlinkDispatcher". Is the intent to have the savepoint launch restore logic in the FlinkDispatcher and have it control the Job upgrade lifecycle?

We are currently looking at running Flink on Kubernetes. FLIP-6 looks to organize that much better than the way things currently work. Specifically for us we are looking to implement a clean way to have clients have a clear deployment/upgrade path for Flink jobs that can be integrated into automated build pipelines and such. Is the intention on the new system to have another orchestration layer for upgrading jobs or will the JobMaster itself handle those situations?

To me the JobMaster seems like the correct place to do the job upgrade coordination because its the single point of control for a single job. Then, for example on kubernetes, one would just have to re-launch the JobMaster containers and it would take care of the rest in the JobMaster logic to consolidate the upgraded JobGraph. On the other hand this might not fit cleanly into the separation of concerns that currently exists within the JobManager.

I am also wondering what work needs doing on FLIP-6. Overall it closely aligns with what we are trying to do on our end to make Flink easier to use so I might get some time to help out with this effort.

Thanks,
James bucher

Re: Idempotent Job Submission

Posted by Till Rohrmann <tr...@apache.org>.
Hi James,

I think if you keep the JobGraph around and don’t generate it by
re-executing the user code, then you will have the same JobID when
re-submitting this JobGraph. This should allow you to do idempotent job
submissions.

However, if you use Flink’s high availability mode, then you should not
have to re-submit a job, because the JobManager will automatically retrieve
all submitted jobs (given a successful job submission before).

Concerning the Flip-6 work, you’re right. It is an ongoing effort and we
try to replace more and more parts with the new code base. The way you
describe the interaction with the JobMasterRunner and the JobMaster is
right.

I think in the future the Dispatcher component will be responsible for
storing a submitted job persistently such that it can be retrieved in case
of a failure. It will also be the responsibility of the Dispatcher to
monitor the running JobMasters and to restart them in case of a failure.
Probably the job upgrade path will also go through the Dispatcher or the
Client, because they know how to stop a job and how to spawn a new JobMaster
with the new job. The JobMaster should only be responsible for running a
single version of a job.

How the upgrade story works with Kubernetes, I cannot tell in detail at the
moment. If it is not possible to talk to Kubernetes from within a
containerized application in order to start new containers, then I assume
that an external layer (e.g. the client) has to do the job upgrade
operation. Otherwise it could work like in the Yarn and Mesos scenario.

At the moment we have the following bigger threads wrt Flip-6 going on:

   - Client side integration with Flip-6 which will most likely mean to
   implement a REST based client
   - Dispatcher component for multi-tenancy
   - Proper fencing for split brain situations
   - Port Mesos to Flip-6 (being worked on)
   - Make Yarn implementation elastic (allowing to allocate and de-allocate
   containers)
   - Testing

We are happy about every helping hand because Flip-6 is a lot of work. Once
the feature freeze for Flink 1.3 is reached (probably next week) I will
spend some time refining these bigger steps and creating JIRA issues for
them.

Cheers,
Till
​

On Thu, Apr 20, 2017 at 6:20 PM, James Bucher <jb...@expedia.com> wrote:

> Hey all,
>
> I have been doing some digging to see if there is a good way to do an
> idempotent job submission. I was hoping to write a job submission agent
> that does the following:
>
>   1.  Checks to see if the cluster is running yet (can contact a
> JobManager)
>   2.  Checks to see if the job it is watching is running.
>   3.  Submits the job if it is not yet running.
>   4.  Retry if there are any issues.
>
> Specifically at the moment there doesn’t seem to be any functionality for
> submitting a job if it doesn’t exist. The current interface creates a
> situation where a race condition is possible (as far as I can tell):
>
> For example if the following sequence of events occurs:
>
>   1.  JobManager fails and a new Leader is re-elected:
>      *   JobManager Asynchronously starts restoring jobs: here<
> https://github.com/apache/flink/blob/release-1.2/
> flink-runtime/src/main/scala/org/apache/flink/runtime/
> jobmanager/JobManager.scala#L300>
>   2.  Client Calls to list currently running jobs (before jobs are
> restored) and gets back an incomplete list of running jobs<
> https://github.com/apache/flink/blob/release-1.2/
> flink-runtime/src/main/scala/org/apache/flink/runtime/
> jobmanager/JobManager.scala#L1009> because SubmitJob registers jobs in
> currentJobs<https://github.com/apache/flink/blob/release-
> 1.2/flink-runtime/src/main/scala/org/apache/flink/
> runtime/jobmanager/JobManager.scala#L1300>
>   3.  Client Assumes Job is no longer running so uses HTTP/CLI/Whatever to
> restore job.
>   4.  Current interfaces don’t pass in the same JobID (a new one is
> generated for each submit) so a new Job is submitted with a new JobID
>   5.  JobManager restores previous instance of the running Job
>   6.  Now there are 2 instances of the job running in the cluster.
>
> While the above state is pretty unlikely to hit when one is submitting
> jobs manually, it seems to me that an agent like the above might end up
> hitting it if the cluster was having trouble with JobManagers failing.
>
> I can see that FLIP-6<https://issues.apache.org/jira/browse/FLINK-4319>
> is rewriting the whole JobManager itself. From my reading of the current
> code base this work is 1/2 way done in master.
>
> From my reading of the code/docs it seems that from the submission side
> the expectation for Docker/Kubernetes is that you will create two sets of
> containers:
>
>   1.  A JobMaster/ResourceManager container that contains the user’s job
> in some form (jar or as a serialized JobGraph).
>   2.  A TaskManager container which is either generic or potentially has
> user libs (up to the implementer/cluster maintainer)
>
> As I currently understand the code the JobMaster instances will:
>
>   1.  Start up a JobMasterRunner which connects to the Leader service and
> creates a JobMaster with the supplied JobGraph (which I assume will always
> have the same JobID for restore purposes).
>   2.  When the node is granted leadership the JobMasterRunner starts the
> JobMaster which will schedule the ExecutionGraph which it created from the
> supplied JobGraph.
>
> This all seems fine for a new job submission but the since the restore
> logic is not yet implemented I am wondering what the way that people will
> interact with clusters for job submission. From this doc<
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077>
> it appears that the current JobManager infrastructure will instead become a
> “FlinkDispatcher". Is the intent to have the savepoint launch restore logic
> in the FlinkDispatcher and have it control the Job upgrade lifecycle?
>
> We are currently looking at running Flink on Kubernetes . FLIP-6 looks to
> organize that much better than the way things currently work. Specifically
> for us we are looking to implement a clean way to have clients have a clear
> deployment/upgrade path for Flink jobs that can be integrated into
> automated build pipelines and such. Is the intention on the new system to
> have another orchestration layer for upgrading jobs or will the JobMaster
> itself handle those situations?
>
> To me the JobMaster seems like the correct place to do the job upgrade
> coordination because its the single point of control for a single job.
> Then, for example on kubernetes, one would just have to re-launch the
> JobMaster containers and it would take care of the rest in the JobMaster
> logic to consolidate the upgraded JobGraph. On the other hand this might
> not fit cleanly into the separation of concerns that currently exists
> within the JobManager.
>
> I am also wondering what work needs doing on FLIP-6. Overall it closely
> aligns with what we are trying to do on our end to make Flink easier to use
> so I might get some time to help out with this effort.
>
> Thanks,
> James bucher
>