You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jp...@free.fr on 2018/10/09 07:57:36 UTC
JobManager did not respond within 60000 ms
I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
... 11 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
... 12 more
I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
This is my flink-conf.yaml
jobmanager.rpc.address: myhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 128
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 100
taskmanager.memory.preallocate: false
taskmanager.data.port: 6121
parallelism.default: 1
taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
blob.storage.directory: /dohdev/flink/tmp/blob
jobmanager.web.port: -1
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /dohdev/flink
high-availability.cluster-id: dev
high-availability.storageDir: file:////mnt/metaflink
high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 5 s
zookeeper.sasl.disable: true
blob.service.cleanup.interval: 60
And I launch a job with this command : bin/flink run -d myjar.jar
I added as an attachment a graph of my job when it works (Graph.PNG).
Do you have an idea of the problem ?
Thanks.
Julien
Re: JobManager did not respond within 60000 ms
Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi again,
Glad that you solved your problem :)
Splitting code into smaller functions has its advantages, but more operators/tasks means more overhead for JobManager/TaskManager to manage them. Usually that’s not a big issue, but as I said, you were running your cluster on extremely low memory settings.
Piotrek
> On 9 Oct 2018, at 18:09, jpreisner@free.fr wrote:
>
> Hi Piotrek,
>
> Thank you for your answer. Actually it was necessary to increase the memory of the JobManager (I had tested it but I had not restarted Flink ...).
>
> I will also work on optimization. I thought it was good practice to create as much function as possible based on their functional value (for example: create two FilterFunctions that have a different functional meaning). So I will try to have fewer functions (for example: gather my two FilterFunctions in one).
>
> Thanks again Piotrek !
>
> Julien.
>
> ----- Mail original -----
> De: "Piotr Nowojski" <pi...@data-artisans.com>
> À: jpreisner@free.fr
> Cc: user@flink.apache.org
> Envoyé: Mardi 9 Octobre 2018 10:37:58
> Objet: Re: JobManager did not respond within 60000 ms
>
> Hi,
>
>
> You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem.
>
>
> Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems?
>
>
> Thanks, Piotrek
>
>
>
>
>
> On 9 Oct 2018, at 09:57, jpreisner@free.fr wrote:
>
>
> I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
>
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
> at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
> ... 11 more
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
> ... 12 more
>
> I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
> I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
>
> This is my flink-conf.yaml
> jobmanager.rpc.address: myhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 128
> taskmanager.heap.mb: 1024
> taskmanager.numberOfTaskSlots: 100
> taskmanager.memory.preallocate: false
> taskmanager.data.port: 6121
> parallelism.default: 1
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
> blob.storage.directory: /dohdev/flink/tmp/blob
> jobmanager.web.port: -1
> high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.root: /dohdev/flink
> high-availability.cluster-id: dev
> high-availability.storageDir: file:////mnt/metaflink
> high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 5 s
> zookeeper.sasl.disable: true
> blob.service.cleanup.interval: 60
>
> And I launch a job with this command : bin/flink run -d myjar.jar
>
> I added as an attachment a graph of my job when it works (Graph.PNG).
>
> Do you have an idea of the problem ?
>
> Thanks.
> Julien
>
>
> <Graph.PNG>
Re: JobManager did not respond within 60000 ms
Posted by jp...@free.fr.
Hi Piotrek,
Thank you for your answer. Actually it was necessary to increase the memory of the JobManager (I had tested it but I had not restarted Flink ...).
I will also work on optimization. I thought it was good practice to create as much function as possible based on their functional value (for example: create two FilterFunctions that have a different functional meaning). So I will try to have fewer functions (for example: gather my two FilterFunctions in one).
Thanks again Piotrek !
Julien.
----- Mail original -----
De: "Piotr Nowojski" <pi...@data-artisans.com>
À: jpreisner@free.fr
Cc: user@flink.apache.org
Envoyé: Mardi 9 Octobre 2018 10:37:58
Objet: Re: JobManager did not respond within 60000 ms
Hi,
You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem.
Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems?
Thanks, Piotrek
On 9 Oct 2018, at 09:57, jpreisner@free.fr wrote:
I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
... 11 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
... 12 more
I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
This is my flink-conf.yaml
jobmanager.rpc.address: myhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 128
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 100
taskmanager.memory.preallocate: false
taskmanager.data.port: 6121
parallelism.default: 1
taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
blob.storage.directory: /dohdev/flink/tmp/blob
jobmanager.web.port: -1
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /dohdev/flink
high-availability.cluster-id: dev
high-availability.storageDir: file:////mnt/metaflink
high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 5 s
zookeeper.sasl.disable: true
blob.service.cleanup.interval: 60
And I launch a job with this command : bin/flink run -d myjar.jar
I added as an attachment a graph of my job when it works (Graph.PNG).
Do you have an idea of the problem ?
Thanks.
Julien
<Graph.PNG>
Re: JobManager did not respond within 60000 ms
Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,
You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem.
Secondly, there are quite some results in google search <https://www.google.com/search?q=flink+JobManager+did+not+respond+within+60000+ms&rlz=1C5CHFA_enDE749DE749&oq=flink+JobManager+did+not+respond+within+60000+ms&aqs=chrome..69i57.1113j0j7&sourceid=chrome&ie=UTF-8> of this error that points toward high-availability issues. Have you read those previously reported problems?
Thanks, Piotrek
> On 9 Oct 2018, at 09:57, jpreisner@free.fr wrote:
>
> I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
>
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
> at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
> ... 11 more
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
> ... 12 more
>
> I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
> I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
>
> This is my flink-conf.yaml
> jobmanager.rpc.address: myhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 128
> taskmanager.heap.mb: 1024
> taskmanager.numberOfTaskSlots: 100
> taskmanager.memory.preallocate: false
> taskmanager.data.port: 6121
> parallelism.default: 1
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
> blob.storage.directory: /dohdev/flink/tmp/blob
> jobmanager.web.port: -1
> high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.root: /dohdev/flink
> high-availability.cluster-id: dev
> high-availability.storageDir: file:////mnt/metaflink
> high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 5 s
> zookeeper.sasl.disable: true
> blob.service.cleanup.interval: 60
>
> And I launch a job with this command : bin/flink run -d myjar.jar
>
> I added as an attachment a graph of my job when it works (Graph.PNG).
>
> Do you have an idea of the problem ?
>
> Thanks.
> Julien
>
>
> <Graph.PNG>