You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by satya brat <br...@gmail.com> on 2020/01/20 08:33:57 UTC

Flink Job Submission Fails even though job is running

I am using standalone cluster of Flink with 1 jobManager and n
taskManagers. When I try to submit a job via command line, the job
submission fails with error message as
org.apache.flink.client.program.ProgramInvocationException: Could not
submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).

On jobManager instance, everything works fine till the job is switched from
DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
following stacktrace

akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:745)

I went through the flink code on github and all the steps required to
execute a job seems to be running fine. However, when jobManager has to
give job submission ack to flink client that triggered the job, the
jobSubmitHandler times out on the akka dispatcher that according to my
understanding takes care of communicating with the job client.

The Flink job consists for 1 Source (kafka), 2 operators and 1 sink(Custom
Sink). Following link shows the jobManager logs:
https://pastebin.com/raw/3GaTtNrG

Once the dispatcher times out, all other Flink UI calls also timeout with
same exception.

Following are the flink client logs that is used to submit job via command
line.

2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend
                     -
--------------------------------------------------------------------------------
2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
                     -  Starting Command Line Client (Version: 1.8.0,
Rev:<unknown>, Date:<unknown>)
2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
                     -  OS current user: root
2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
                     -  Current Hadoop/Kerberos user: <no hadoop
dependency found>
2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
                     -  JVM: Java HotSpot(TM) 64-Bit Server VM -
Oracle Corporation - 1.8/25.5-b02
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -  Maximum heap size: 2677 MiBytes
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -  JAVA_HOME: (not set)
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -  No Hadoop Dependency available
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -  JVM Options:
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -
-Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -
-Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -
-Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -  Program Arguments:
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -     run
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -     -d
2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
                     -     -c
2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend
                     -     /home/fse/flink-kafka-relayer-0.2.jar
2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend
                     -  Classpath:
/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend
                     -
--------------------------------------------------------------------------------
2019-09-28 19:34:21,328 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.rpc.address,
<job-manager-ip>
2019-09-28 19:34:21,328 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.rpc.port, 6123
2019-09-28 19:34:21,328 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.heap.size, 1024m
2019-09-28 19:34:21,329 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: taskmanager.heap.size, 1024m
2019-09-28 19:34:21,329 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-09-28 19:34:21,329 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: parallelism.default, 1
2019-09-28 19:34:21,329 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: metrics.reporter.jmx.class,
org.apache.flink.metrics.jmx.JMXReporter
2019-09-28 19:34:21,329 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: metrics.reporter.jmx.port, 8789
2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend
                     - Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:259)
    at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
    at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 5 more
2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem
                     - Hadoop is not in the classpath/dependencies.
The extended set of supported File Systems via Hadoop is not
available.
2019-09-28 19:34:21,545 INFO
org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
Cannot create Hadoop Security Module because Hadoop cannot be found in
the Classpath.
2019-09-28 19:34:21,560 INFO
org.apache.flink.runtime.security.SecurityUtils               - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.
2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend
                     - Running 'run' command.
2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend
                     - Building program from JAR file
2019-09-28 19:34:21,744 INFO
org.apache.flink.configuration.Configuration                  - Config
uses fallback configuration key 'jobmanager.rpc.address' instead of
key 'rest.address'
2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient
                     - Rest client endpoint started.
2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend
                     - Starting execution of program
2019-09-28 19:34:21,898 INFO
org.apache.flink.client.program.rest.RestClusterClient        -
Starting program in interactive mode (detached: true)
2019-09-28 19:34:22,594 WARN
org.apache.flink.streaming.api.environment.StreamContextEnvironment  -
Job was executed in detached mode, the results will be available on
completion.
2019-09-28 19:34:22,632 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.rpc.address,
<job-manager-ip>
2019-09-28 19:34:22,632 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.rpc.port, 6123
2019-09-28 19:34:22,632 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: jobmanager.heap.size, 1024m
2019-09-28 19:34:22,633 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: taskmanager.heap.size, 1024m
2019-09-28 19:34:22,633 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-09-28 19:34:22,633 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: parallelism.default, 1
2019-09-28 19:34:22,633 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: metrics.reporter.jmx.class,
org.apache.flink.metrics.jmx.JMXReporter
2019-09-28 19:34:22,633 INFO
org.apache.flink.configuration.GlobalConfiguration            -
Loading configuration property: metrics.reporter.jmx.port, 8789
2019-09-28 19:34:22,635 INFO
org.apache.flink.client.program.rest.RestClusterClient        -
Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient
                     - Shutting down rest endpoint.
2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient
                     - Rest endpoint shutdown complete.
2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend
                     - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not
submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
    at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown
Source)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
    at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown
Source)
    at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
    at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
    at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
    at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
    at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
    at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown
Source)
    at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
    at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
    at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
    at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:745)

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
    at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown
Source)
    at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
    ... 4 more

I have turned on debug logs for flink, akka and kafka but not able to
figure out what is going wrong. I have very basic understanding of akka
because of which not able to figure out what is going wrong. Can someone
help me with that?? I am running flink 1.8.0.

Re: Flink Job Submission Fails even though job is running

Posted by tison <wa...@gmail.com>.
I guess it is a jm internal error which crashes the dispatcher or race
condition so that the returning future never completed, possibly related to
jdk bug. But again, never have a log in the case I cannot conclude anything.

Best,
tison.


tison <wa...@gmail.com> 于2020年1月22日周三 上午10:49写道:

> It is a known issue reported multiple times that if you are in an early
> jdk 1.8.x version, upgrade the bugfix version and the issue will vanish.
>
> I don't ever have a log on jm side when this issue reported so I'm sorry
> unable to explain more...
>
> Best,
> tison.
>
>
> Yang Wang <da...@gmail.com> 于2020年1月22日周三 上午10:46写道:
>
>> The "web.timeout" will be used for all web monitor asynchronous
>> operations, including the
>> "DispatcherGateway.submitJob" in the "JobSubmitHandler".
>> So when you increase the timeout, does it still could not work?
>>
>> Best,
>> Yang
>>
>> satya brat <br...@gmail.com> 于2020年1月21日周二 下午8:57写道:
>>
>>> How does web.timeout help hear?? The issue is with respect to aka
>>> dispatched timing out. The job is submitted to the task managers but the
>>> response doesn't reach the client.
>>>
>>> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi satya,
>>>>
>>>> Maybe the job has been submitted to Dispatcher successfully and the
>>>> internal submitting job takes
>>>> too long time(more than 10s). So it failed with timeout. Could you
>>>> please set the `web.timeout: 30000`
>>>> and run again?
>>>>
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> satya brat <br...@gmail.com> 于2020年1月20日周一 下午4:34写道:
>>>>
>>>>> I am using standalone cluster of Flink with 1 jobManager and n
>>>>> taskManagers. When I try to submit a job via command line, the job
>>>>> submission fails with error message as
>>>>> org.apache.flink.client.program.ProgramInvocationException: Could not
>>>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>>>>
>>>>> On jobManager instance, everything works fine till the job is switched
>>>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>>>>> following stacktrace
>>>>>
>>>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I went through the flink code on github and all the steps required to
>>>>> execute a job seems to be running fine. However, when jobManager has to
>>>>> give job submission ack to flink client that triggered the job, the
>>>>> jobSubmitHandler times out on the akka dispatcher that according to my
>>>>> understanding takes care of communicating with the job client.
>>>>>
>>>>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>>>>> sink(Custom Sink). Following link shows the jobManager logs:
>>>>> https://pastebin.com/raw/3GaTtNrG
>>>>>
>>>>> Once the dispatcher times out, all other Flink UI calls also timeout
>>>>> with same exception.
>>>>>
>>>>> Following are the flink client logs that is used to submit job via
>>>>> command line.
>>>>>
>>>>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: 1.8.0, Rev:<unknown>, Date:<unknown>)
>>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
>>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
>>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.5-b02
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2677 MiBytes
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: (not set)
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  No Hadoop Dependency available
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -d
>>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -c
>>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -     /home/fse/flink-kafka-relayer-0.2.jar
>>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
>>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>>>> 2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>>>>> java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>     at java.lang.Class.forName(Class.java:259)
>>>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
>>>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
>>>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>     ... 5 more
>>>>> 2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
>>>>> 2019-09-28 19:34:21,545 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
>>>>> 2019-09-28 19:34:21,560 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
>>>>> 2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
>>>>> 2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
>>>>> 2019-09-28 19:34:21,744 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>>>>> 2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
>>>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
>>>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: true)
>>>>> 2019-09-28 19:34:22,594 WARN  org.apache.flink.streaming.api.environment.StreamContextEnvironment  - Job was executed in detached mode, the results will be available on completion.
>>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>>>> 2019-09-28 19:34:22,635 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
>>>>> 2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
>>>>> 2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
>>>>> 2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
>>>>>     at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>>>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>>>>>     at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>>>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
>>>>>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>>>>     at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>>>>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>>     at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>>>     at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>>>     at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown Source)
>>>>>     at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>>>>>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>>>>>     at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown Source)
>>>>>     at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
>>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>>     at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
>>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>>     at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
>>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown Source)
>>>>>     at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
>>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
>>>>>     at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
>>>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> End of exception on server side>]
>>>>>     at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>>>>>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>>>>>     at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown Source)
>>>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
>>>>>     ... 4 more
>>>>>
>>>>> I have turned on debug logs for flink, akka and kafka but not able to
>>>>> figure out what is going wrong. I have very basic understanding of akka
>>>>> because of which not able to figure out what is going wrong. Can someone
>>>>> help me with that?? I am running flink 1.8.0.
>>>>>
>>>>

Re: Flink Job Submission Fails even though job is running

Posted by tison <wa...@gmail.com>.
It is a known issue reported multiple times that if you are in an early jdk
1.8.x version, upgrade the bugfix version and the issue will vanish.

I don't ever have a log on jm side when this issue reported so I'm sorry
unable to explain more...

Best,
tison.


Yang Wang <da...@gmail.com> 于2020年1月22日周三 上午10:46写道:

> The "web.timeout" will be used for all web monitor asynchronous
> operations, including the
> "DispatcherGateway.submitJob" in the "JobSubmitHandler".
> So when you increase the timeout, does it still could not work?
>
> Best,
> Yang
>
> satya brat <br...@gmail.com> 于2020年1月21日周二 下午8:57写道:
>
>> How does web.timeout help hear?? The issue is with respect to aka
>> dispatched timing out. The job is submitted to the task managers but the
>> response doesn't reach the client.
>>
>> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi satya,
>>>
>>> Maybe the job has been submitted to Dispatcher successfully and the
>>> internal submitting job takes
>>> too long time(more than 10s). So it failed with timeout. Could you
>>> please set the `web.timeout: 30000`
>>> and run again?
>>>
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> satya brat <br...@gmail.com> 于2020年1月20日周一 下午4:34写道:
>>>
>>>> I am using standalone cluster of Flink with 1 jobManager and n
>>>> taskManagers. When I try to submit a job via command line, the job
>>>> submission fails with error message as
>>>> org.apache.flink.client.program.ProgramInvocationException: Could not
>>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>>>
>>>> On jobManager instance, everything works fine till the job is switched
>>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>>>> following stacktrace
>>>>
>>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I went through the flink code on github and all the steps required to
>>>> execute a job seems to be running fine. However, when jobManager has to
>>>> give job submission ack to flink client that triggered the job, the
>>>> jobSubmitHandler times out on the akka dispatcher that according to my
>>>> understanding takes care of communicating with the job client.
>>>>
>>>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>>>> sink(Custom Sink). Following link shows the jobManager logs:
>>>> https://pastebin.com/raw/3GaTtNrG
>>>>
>>>> Once the dispatcher times out, all other Flink UI calls also timeout
>>>> with same exception.
>>>>
>>>> Following are the flink client logs that is used to submit job via
>>>> command line.
>>>>
>>>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: 1.8.0, Rev:<unknown>, Date:<unknown>)
>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
>>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.5-b02
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2677 MiBytes
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: (not set)
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  No Hadoop Dependency available
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -d
>>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -c
>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -     /home/fse/flink-kafka-relayer-0.2.jar
>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
>>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>>> 2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>>>> java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
>>>>     at java.lang.Class.forName0(Native Method)
>>>>     at java.lang.Class.forName(Class.java:259)
>>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
>>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
>>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
>>>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     ... 5 more
>>>> 2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
>>>> 2019-09-28 19:34:21,545 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
>>>> 2019-09-28 19:34:21,560 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
>>>> 2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
>>>> 2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
>>>> 2019-09-28 19:34:21,744 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>>>> 2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
>>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
>>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: true)
>>>> 2019-09-28 19:34:22,594 WARN  org.apache.flink.streaming.api.environment.StreamContextEnvironment  - Job was executed in detached mode, the results will be available on completion.
>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>>> 2019-09-28 19:34:22,635 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
>>>> 2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
>>>> 2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
>>>> 2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
>>>> org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
>>>>     at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>>>>     at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
>>>>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>>>     at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>>>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>     at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>>     at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>>     at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown Source)
>>>>     at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>>>>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>>>>     at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown Source)
>>>>     at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>     at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>     at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
>>>>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>>>>     at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown Source)
>>>>     at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
>>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
>>>>     at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
>>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> End of exception on server side>]
>>>>     at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>>>>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>>>>     at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown Source)
>>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
>>>>     ... 4 more
>>>>
>>>> I have turned on debug logs for flink, akka and kafka but not able to
>>>> figure out what is going wrong. I have very basic understanding of akka
>>>> because of which not able to figure out what is going wrong. Can someone
>>>> help me with that?? I am running flink 1.8.0.
>>>>
>>>

Re: Flink Job Submission Fails even though job is running

Posted by Yang Wang <da...@gmail.com>.
The "web.timeout" will be used for all web monitor asynchronous operations,
including the
"DispatcherGateway.submitJob" in the "JobSubmitHandler".
So when you increase the timeout, does it still could not work?

Best,
Yang

satya brat <br...@gmail.com> 于2020年1月21日周二 下午8:57写道:

> How does web.timeout help hear?? The issue is with respect to aka
> dispatched timing out. The job is submitted to the task managers but the
> response doesn't reach the client.
>
> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang <da...@gmail.com> wrote:
>
>> Hi satya,
>>
>> Maybe the job has been submitted to Dispatcher successfully and the
>> internal submitting job takes
>> too long time(more than 10s). So it failed with timeout. Could you please
>> set the `web.timeout: 30000`
>> and run again?
>>
>>
>>
>> Best,
>> Yang
>>
>> satya brat <br...@gmail.com> 于2020年1月20日周一 下午4:34写道:
>>
>>> I am using standalone cluster of Flink with 1 jobManager and n
>>> taskManagers. When I try to submit a job via command line, the job
>>> submission fails with error message as
>>> org.apache.flink.client.program.ProgramInvocationException: Could not
>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>>
>>> On jobManager instance, everything works fine till the job is switched
>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>>> following stacktrace
>>>
>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> I went through the flink code on github and all the steps required to
>>> execute a job seems to be running fine. However, when jobManager has to
>>> give job submission ack to flink client that triggered the job, the
>>> jobSubmitHandler times out on the akka dispatcher that according to my
>>> understanding takes care of communicating with the job client.
>>>
>>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>>> sink(Custom Sink). Following link shows the jobManager logs:
>>> https://pastebin.com/raw/3GaTtNrG
>>>
>>> Once the dispatcher times out, all other Flink UI calls also timeout
>>> with same exception.
>>>
>>> Following are the flink client logs that is used to submit job via
>>> command line.
>>>
>>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: 1.8.0, Rev:<unknown>, Date:<unknown>)
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.5-b02
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2677 MiBytes
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: (not set)
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  No Hadoop Dependency available
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -d
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -c
>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -     /home/fse/flink-kafka-relayer-0.2.jar
>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
>>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>> 2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>>> java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
>>>     at java.lang.Class.forName0(Native Method)
>>>     at java.lang.Class.forName(Class.java:259)
>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
>>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
>>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>     ... 5 more
>>> 2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
>>> 2019-09-28 19:34:21,545 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
>>> 2019-09-28 19:34:21,560 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
>>> 2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
>>> 2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
>>> 2019-09-28 19:34:21,744 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>>> 2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
>>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: true)
>>> 2019-09-28 19:34:22,594 WARN  org.apache.flink.streaming.api.environment.StreamContextEnvironment  - Job was executed in detached mode, the results will be available on completion.
>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>>> 2019-09-28 19:34:22,635 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
>>> 2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
>>> 2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
>>> 2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
>>> org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
>>>     at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>>>     at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
>>>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>>     at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>     at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>     at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>     at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown Source)
>>>     at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>>>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>>>     at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown Source)
>>>     at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>     at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>     at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown Source)
>>>     at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
>>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
>>>     at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> End of exception on server side>]
>>>     at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>>>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>>>     at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown Source)
>>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
>>>     ... 4 more
>>>
>>> I have turned on debug logs for flink, akka and kafka but not able to
>>> figure out what is going wrong. I have very basic understanding of akka
>>> because of which not able to figure out what is going wrong. Can someone
>>> help me with that?? I am running flink 1.8.0.
>>>
>>

Re: Flink Job Submission Fails even though job is running

Posted by satya brat <br...@gmail.com>.
How does web.timeout help hear?? The issue is with respect to aka
dispatched timing out. The job is submitted to the task managers but the
response doesn't reach the client.

On Tue, Jan 21, 2020 at 12:34 PM Yang Wang <da...@gmail.com> wrote:

> Hi satya,
>
> Maybe the job has been submitted to Dispatcher successfully and the
> internal submitting job takes
> too long time(more than 10s). So it failed with timeout. Could you please
> set the `web.timeout: 30000`
> and run again?
>
>
>
> Best,
> Yang
>
> satya brat <br...@gmail.com> 于2020年1月20日周一 下午4:34写道:
>
>> I am using standalone cluster of Flink with 1 jobManager and n
>> taskManagers. When I try to submit a job via command line, the job
>> submission fails with error message as
>> org.apache.flink.client.program.ProgramInvocationException: Could not
>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>
>> On jobManager instance, everything works fine till the job is switched
>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>> following stacktrace
>>
>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> I went through the flink code on github and all the steps required to
>> execute a job seems to be running fine. However, when jobManager has to
>> give job submission ack to flink client that triggered the job, the
>> jobSubmitHandler times out on the akka dispatcher that according to my
>> understanding takes care of communicating with the job client.
>>
>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>> sink(Custom Sink). Following link shows the jobManager logs:
>> https://pastebin.com/raw/3GaTtNrG
>>
>> Once the dispatcher times out, all other Flink UI calls also timeout with
>> same exception.
>>
>> Following are the flink client logs that is used to submit job via
>> command line.
>>
>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: 1.8.0, Rev:<unknown>, Date:<unknown>)
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.5-b02
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2677 MiBytes
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: (not set)
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  No Hadoop Dependency available
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -d
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -c
>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -     /home/fse/flink-kafka-relayer-0.2.jar
>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
>> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>> 2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>> java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
>>     at java.lang.Class.forName0(Native Method)
>>     at java.lang.Class.forName(Class.java:259)
>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
>>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     ... 5 more
>> 2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
>> 2019-09-28 19:34:21,545 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
>> 2019-09-28 19:34:21,560 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
>> 2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
>> 2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
>> 2019-09-28 19:34:21,744 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>> 2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
>> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: true)
>> 2019-09-28 19:34:22,594 WARN  org.apache.flink.streaming.api.environment.StreamContextEnvironment  - Job was executed in detached mode, the results will be available on completion.
>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
>> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
>> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
>> 2019-09-28 19:34:22,635 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
>> 2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
>> 2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
>> 2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
>> org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
>>     at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>>     at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
>>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>     at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>     at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>     at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>     at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown Source)
>>     at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>>     at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown Source)
>>     at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>     at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>     at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
>>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>>     at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown Source)
>>     at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
>>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
>>     at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> End of exception on server side>]
>>     at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>>     at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown Source)
>>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
>>     ... 4 more
>>
>> I have turned on debug logs for flink, akka and kafka but not able to
>> figure out what is going wrong. I have very basic understanding of akka
>> because of which not able to figure out what is going wrong. Can someone
>> help me with that?? I am running flink 1.8.0.
>>
>

Re: Flink Job Submission Fails even though job is running

Posted by Yang Wang <da...@gmail.com>.
Hi satya,

Maybe the job has been submitted to Dispatcher successfully and the
internal submitting job takes
too long time(more than 10s). So it failed with timeout. Could you please
set the `web.timeout: 30000`
and run again?



Best,
Yang

satya brat <br...@gmail.com> 于2020年1月20日周一 下午4:34写道:

> I am using standalone cluster of Flink with 1 jobManager and n
> taskManagers. When I try to submit a job via command line, the job
> submission fails with error message as
> org.apache.flink.client.program.ProgramInvocationException: Could not
> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>
> On jobManager instance, everything works fine till the job is switched
> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
> following stacktrace
>
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>     at java.lang.Thread.run(Thread.java:745)
>
> I went through the flink code on github and all the steps required to
> execute a job seems to be running fine. However, when jobManager has to
> give job submission ack to flink client that triggered the job, the
> jobSubmitHandler times out on the akka dispatcher that according to my
> understanding takes care of communicating with the job client.
>
> The Flink job consists for 1 Source (kafka), 2 operators and 1 sink(Custom
> Sink). Following link shows the jobManager logs:
> https://pastebin.com/raw/3GaTtNrG
>
> Once the dispatcher times out, all other Flink UI calls also timeout with
> same exception.
>
> Following are the flink client logs that is used to submit job via command
> line.
>
> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: 1.8.0, Rev:<unknown>, Date:<unknown>)
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.5-b02
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2677 MiBytes
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: (not set)
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  No Hadoop Dependency available
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -d
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend                       -     -c
> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -     /home/fse/flink-kafka-relayer-0.2.jar
> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/log4j-1.2.17.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/slf4j-log4j12-1.7.15.jar:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/lib/flink-dist_2.11-1.8.0.jar:::
> 2019-09-28 19:34:21,324 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
> 2019-09-28 19:34:21,328 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
> 2019-09-28 19:34:21,329 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
> 2019-09-28 19:34:21,333 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:259)
>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1230)
>     at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1190)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1115)
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     ... 5 more
> 2019-09-28 19:34:21,343 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
> 2019-09-28 19:34:21,545 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
> 2019-09-28 19:34:21,560 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
> 2019-09-28 19:34:21,561 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
> 2019-09-28 19:34:21,566 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
> 2019-09-28 19:34:21,744 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
> 2019-09-28 19:34:21,896 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
> 2019-09-28 19:34:21,898 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: true)
> 2019-09-28 19:34:22,594 WARN  org.apache.flink.streaming.api.environment.StreamContextEnvironment  - Job was executed in detached mode, the results will be available on completion.
> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, <job-manager-ip>
> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
> 2019-09-28 19:34:22,632 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.class, org.apache.flink.metrics.jmx.JMXReporter
> 2019-09-28 19:34:22,633 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.jmx.port, 8789
> 2019-09-28 19:34:22,635 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job f839aefee74aa4483ce8f8fd2e49b69e (detached: true).
> 2019-09-28 19:36:04,341 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
> 2019-09-28 19:36:04,343 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
> 2019-09-28 19:36:04,343 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e)
>     at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250)
>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>     at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>     at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>     at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>     at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>     at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>     at org.apache.flink.client.cli.CliFrontend$$Lambda$5/1971851377.call(Unknown Source)
>     at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>     at org.apache.flink.client.program.rest.RestClusterClient$$Lambda$17/788892554.apply(Unknown Source)
>     at java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>     at java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>     at java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>     at org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$34/1092254958.accept(Unknown Source)
>     at java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
>     at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
>     at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-177004106]] after [100000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>     at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>     at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>     at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>     at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>     at java.lang.Thread.run(Thread.java:745)
>
> End of exception on server side>]
>     at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>     at org.apache.flink.runtime.rest.RestClient$$Lambda$33/1155836850.apply(Unknown Source)
>     at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:604)
>     ... 4 more
>
> I have turned on debug logs for flink, akka and kafka but not able to
> figure out what is going wrong. I have very basic understanding of akka
> because of which not able to figure out what is going wrong. Can someone
> help me with that?? I am running flink 1.8.0.
>