You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Geoffrey Mon <ge...@gmail.com> on 2017/02/09 23:34:51 UTC

"Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:

<snip>
02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
                - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
                - Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
<snip>.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minutes), but even then Flink does not acknowledge or execute any other
jobs and there is the same timeout error. Does anyone know how I can get
Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Posted by Geoffrey Mon <ge...@gmail.com>.
Hi Gordon,

I was using a Flink session that lasted as long as the plan jar was still
running (which I believe would be a "per job yarn cluster"), by submitting
a command to EMR that looked like:
flink run -m yarn-cluster -yn 5 [jar] [jar arguments]

Cheers,
Geoffrey

On Fri, Feb 17, 2017 at 12:09 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Geoffrey,
>
> Thanks for investigating and updating on this. Good to know that it is
> working!
>
> Just to clarify, was your series of jobs submitted to a “yarn session +
> regular bin/flink run”, or “per job yarn cluster”?
> I’m asking just to make sure of the limitations Robert mentioned.
>
> Cheers,
> Gordon
>
>
> On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geofbot@gmail.com)
> wrote:
>
> Hi Robert,
>
> Thanks for your reply. I've done some further testing and (hopefully)
> solved the issue; this turned out to be a red herring.  After discovering
> that the same issue manifested itself when testing on my local machine, I
> found that multiple jobs can be submitted from a main() function for both
> temporary and permanent Flink YARN clusters, and that the issue was not
> with Flink or with YARN, but with my job file.
>
> In one part of my job, I need to fill in missing components of a vector
> with zeroes. I did this by combining the vector DataSet with another
> DataSet containing indexed zeroes using a union operation and an
> aggregation operation. In my problematic job, I used
> ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
> Tuples containing an index and a zero. However, for input files with very
> large parameters, I needed to generate very large length DataSets of
> zeroes, and since I was using fromElements, the client needed to send the
> Flink runtime all of the elements with which to create the DataSet (lots
> and lots of zeroes). This caused the job to time out before execution,
> making me think that the job had not been properly received by the runtime.
>
> I've replaced this with ExecutionEnvironment#generateSequence and a map
> function mapping each number of the generated sequence to a tuple with a
> zero. This has solved the issue and my job seems to be running fine for now.
> (
> https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
> )
>
> Again, thank you very much for your help.
>
> Sincerely,
> Geoffrey
>
> On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger <rm...@apache.org>
> wrote:
>
> Hi Geoffrey,
>
> I think the "per job yarn cluster" feature does probably not work for one
> main() function submitting multiple jobs.
> If you have a yarn session + regular "flink run" it should work.
>
> On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <ge...@gmail.com> wrote:
>
> Just to clarify, is Flink designed to allow submitting multiple jobs from
> a single program class when using a YARN cluster? I wasn't sure based on
> the documentation.
>
> Cheers,
> Geoffrey
>
>
> On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <ge...@gmail.com> wrote:
>
> Hello all,
>
> I'm running a Flink plan made up of multiple jobs. The source for my job
> can be found here if it would help in any way:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> Each of the jobs (except for the first job) depends on files generated by
> the previous job; I'm running it on an AWS EMR cluster using YARN.
>
> When I submit the plan file, the first job runs as planned. After it
> completes, the second job is submitted by the YARN client:
>
> <snip>
> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Waiting until all TaskManagers have connected
> Waiting until all TaskManagers have connected
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - TaskManager status (5/5)
> TaskManager status (5/5)
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - All TaskManagers are connected
> All TaskManagers are connected
> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Submitting job with JobID:
> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@
> <snip>.ec2.internal:35598/user/jobmanager#68430682]
>
> If the input file is small and the first job runs quickly (~1 minute works
> for me), then the second job runs fine. However, if the input file for my
> first job is large and the first job takes more than a minute or so to
> complete, Flink will not acknowledge receiving the next job; the web Flink
> console does not show any new jobs and Flink logs do not mention receiving
> any new jobs after the first job has completed. The YARN client's job
> submission times out after Flink does not respond:
>
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
> I have tried increasing akka.client.timeout to large values such as 1200s
> (20 minutes), but even then Flink does not acknowledge or execute any other
> jobs and there is the same timeout error. Does anyone know how I can get
> Flink to execute all of the jobs properly?
>
> Cheers,
> Geoffrey Mon
>
>
>

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Geoffrey,

Thanks for investigating and updating on this. Good to know that it is working!

Just to clarify, was your series of jobs submitted to a “yarn session + regular bin/flink run”, or “per job yarn cluster”?
I’m asking just to make sure of the limitations Robert mentioned.

Cheers,
Gordon


On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geofbot@gmail.com) wrote:

Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully) solved the issue; this turned out to be a red herring.  After discovering that the same issue manifested itself when testing on my local machine, I found that multiple jobs can be submitted from a main() function for both temporary and permanent Flink YARN clusters, and that the issue was not with Flink or with YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector with zeroes. I did this by combining the vector DataSet with another DataSet containing indexed zeroes using a union operation and an aggregation operation. In my problematic job, I used ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of Tuples containing an index and a zero. However, for input files with very large parameters, I needed to generate very large length DataSets of zeroes, and since I was using fromElements, the client needed to send the Flink runtime all of the elements with which to create the DataSet (lots and lots of zeroes). This caused the job to time out before execution, making me think that the job had not been properly received by the runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map function mapping each number of the generated sequence to a tuple with a zero. This has solved the issue and my job seems to be running fine for now.
(https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger <rm...@apache.org> wrote:
Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work. 

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <ge...@gmail.com> wrote:
Just to clarify, is Flink designed to allow submitting multiple jobs from a single program class when using a YARN cluster? I wasn't sure based on the documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <ge...@gmail.com> wrote:
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job can be found here if it would help in any way: https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it completes, the second job is submitted by the YARN client:

<snip>
02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient                       - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient                       - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient                       - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient                       - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@<snip>.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works for me), then the second job runs fine. However, if the input file for my first job is large and the first job takes more than a minute or so to complete, Flink will not acknowledge receiving the next job; the web Flink console does not show any new jobs and Flink logs do not mention receiving any new jobs after the first job has completed. The YARN client's job submission times out after Flink does not respond:

Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s (20 minutes), but even then Flink does not acknowledge or execute any other jobs and there is the same timeout error. Does anyone know how I can get Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon


Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Posted by Geoffrey Mon <ge...@gmail.com>.
Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully)
solved the issue; this turned out to be a red herring.  After discovering
that the same issue manifested itself when testing on my local machine, I
found that multiple jobs can be submitted from a main() function for both
temporary and permanent Flink YARN clusters, and that the issue was not
with Flink or with YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector
with zeroes. I did this by combining the vector DataSet with another
DataSet containing indexed zeroes using a union operation and an
aggregation operation. In my problematic job, I used
ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
Tuples containing an index and a zero. However, for input files with very
large parameters, I needed to generate very large length DataSets of
zeroes, and since I was using fromElements, the client needed to send the
Flink runtime all of the elements with which to create the DataSet (lots
and lots of zeroes). This caused the job to time out before execution,
making me think that the job had not been properly received by the runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map
function mapping each number of the generated sequence to a tuple with a
zero. This has solved the issue and my job seems to be running fine for now.
(
https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger <rm...@apache.org> wrote:

Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work.

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <ge...@gmail.com> wrote:

Just to clarify, is Flink designed to allow submitting multiple jobs from a
single program class when using a YARN cluster? I wasn't sure based on the
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <ge...@gmail.com> wrote:

Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:

<snip>
02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
                - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
                - Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
<snip>.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minutes), but even then Flink does not acknowledge or execute any other
jobs and there is the same timeout error. Does anyone know how I can get
Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Posted by Robert Metzger <rm...@apache.org>.
Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work.

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon <ge...@gmail.com> wrote:

> Just to clarify, is Flink designed to allow submitting multiple jobs from
> a single program class when using a YARN cluster? I wasn't sure based on
> the documentation.
>
> Cheers,
> Geoffrey
>
>
> On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <ge...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm running a Flink plan made up of multiple jobs. The source for my job
>> can be found here if it would help in any way: https://github.com/
>> quinngroup/flink-r1dl/blob/master/src/main/java/com/
>> github/quinngroup/R1DL.java
>> Each of the jobs (except for the first job) depends on files generated by
>> the previous job; I'm running it on an AWS EMR cluster using YARN.
>>
>> When I submit the plan file, the first job runs as planned. After it
>> completes, the second job is submitted by the YARN client:
>>
>> <snip>
>> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
>> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
>> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - Waiting until all TaskManagers have connected
>> Waiting until all TaskManagers have connected
>> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - TaskManager status (5/5)
>> TaskManager status (5/5)
>> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - All TaskManagers are connected
>> All TaskManagers are connected
>> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>>                     - Submitting job with JobID:
>> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
>> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@<snip>.
>> ec2.internal:35598/user/jobmanager#68430682]
>>
>> If the input file is small and the first job runs quickly (~1 minute
>> works for me), then the second job runs fine. However, if the input file
>> for my first job is large and the first job takes more than a minute or so
>> to complete, Flink will not acknowledge receiving the next job; the web
>> Flink console does not show any new jobs and Flink logs do not mention
>> receiving any new jobs after the first job has completed. The YARN client's
>> job submission times out after Flink does not respond:
>>
>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.
>> handleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.
>> handleMessage(JobClientActor.java:239)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(
>> FlinkUntypedActor.java:88)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
>> FlinkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>> UntypedActor.scala:167)
>>
>> I have tried increasing akka.client.timeout to large values such as 1200s
>> (20 minutes), but even then Flink does not acknowledge or execute any other
>> jobs and there is the same timeout error. Does anyone know how I can get
>> Flink to execute all of the jobs properly?
>>
>> Cheers,
>> Geoffrey Mon
>>
>

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

Posted by Geoffrey Mon <ge...@gmail.com>.
Just to clarify, is Flink designed to allow submitting multiple jobs from a
single program class when using a YARN cluster? I wasn't sure based on the
documentation.

Cheers,
Geoffrey

On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon <ge...@gmail.com> wrote:

> Hello all,
>
> I'm running a Flink plan made up of multiple jobs. The source for my job
> can be found here if it would help in any way:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> Each of the jobs (except for the first job) depends on files generated by
> the previous job; I'm running it on an AWS EMR cluster using YARN.
>
> When I submit the plan file, the first job runs as planned. After it
> completes, the second job is submitted by the YARN client:
>
> <snip>
> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Waiting until all TaskManagers have connected
> Waiting until all TaskManagers have connected
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - TaskManager status (5/5)
> TaskManager status (5/5)
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - All TaskManagers are connected
> All TaskManagers are connected
> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Submitting job with JobID:
> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@
> <snip>.ec2.internal:35598/user/jobmanager#68430682]
>
> If the input file is small and the first job runs quickly (~1 minute works
> for me), then the second job runs fine. However, if the input file for my
> first job is large and the first job takes more than a minute or so to
> complete, Flink will not acknowledge receiving the next job; the web Flink
> console does not show any new jobs and Flink logs do not mention receiving
> any new jobs after the first job has completed. The YARN client's job
> submission times out after Flink does not respond:
>
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
> I have tried increasing akka.client.timeout to large values such as 1200s
> (20 minutes), but even then Flink does not acknowledge or execute any other
> jobs and there is the same timeout error. Does anyone know how I can get
> Flink to execute all of the jobs properly?
>
> Cheers,
> Geoffrey Mon
>