You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ivan Yang <iv...@gmail.com> on 2021/08/19 06:29:49 UTC

Just failed while starting

Dear Flink community,

I recently running into this issue at a job startup. It happened from time to time. Here is the exception from the job manager:

2021-08-17 01:21:01,944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Defence raw event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot 58af05c3109a0fe8f96ea8936c0783a4 active.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_302]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_302]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_302]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 3150 tasks should be restarted to recover the failed task 7ac3b50525c642dc419b976cfaf0ee0e_541. 
2021-08-17 01:21:02,012 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Event Router prod05 (0f10bafc07e48918f955fb22cbaa5735) switched from state RUNNING to RESTARTING.

I am using kubernetes deployment session mode. We have 4 jobs running in the cluster, 3 small jobs never had the problem. The issue only happened to the one large job, which is 10 time more parallelisms then other smaller jobs. Can someone help me on what the root cause of the issue and how to avoid it.

Thanks,
Ivan



Re: Just failed while starting

Posted by Chesnay Schepler <ch...@apache.org>.
Can you share the logs with us (ideally on DEBUG if available) from the 
affected TaskManager and JobManager?

On 19/08/2021 08:29, Ivan Yang wrote:
> Dear Flink community,
>
> I recently running into this issue at a job startup. It happened from time to time. Here is the exception from the job manager:
>
> 2021-08-17 01:21:01,944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Defence raw event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
> org.apache.flink.util.FlinkException: Could not mark slot 58af05c3109a0fe8f96ea8936c0783a4 active.
> 	at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_302]
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_302]
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_302]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
> 2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution a7be17221c0726a67679091062cfa8dc.
> 2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution a7be17221c0726a67679091062cfa8dc.
> 2021-08-17 01:21:01,945 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 7ac3b50525c642dc419b976cfaf0ee0e_541.
> 2021-08-17 01:21:01,999 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 3150 tasks should be restarted to recover the failed task 7ac3b50525c642dc419b976cfaf0ee0e_541.
> 2021-08-17 01:21:02,012 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Event Router prod05 (0f10bafc07e48918f955fb22cbaa5735) switched from state RUNNING to RESTARTING.
>
> I am using kubernetes deployment session mode. We have 4 jobs running in the cluster, 3 small jobs never had the problem. The issue only happened to the one large job, which is 10 time more parallelisms then other smaller jobs. Can someone help me on what the root cause of the issue and how to avoid it.
>
> Thanks,
> Ivan
>
>