You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oleksandra Levchenko <ol...@inria.fr> on 2017/10/19 14:49:58 UTC

Execution Failed (cluster setup Flink+Hadoop), Task Manager was lost/killed

Hi, 

I am running Flink batch job on Standalone Cluster (16 nodes), on top of Hadoop. 
The chain looks like:

DataSet1 = env.readTextFile (csv on hdfs)
.map
.flatMap
.groupBy
.reduce
.map
.writeAsCsv (DataSet 1)

DataSet2 = env.readTextFile 
.map
.flatMap

env.readCsvFile (DataSet1)
DataSet1.flatJoin(DataSet2)
.groupBy
.reduce
.filter
.count

The job finishes successfully with DataSource ~ 24.3 G 
As I scaled to a larger data  (244.3 G ) it fails   with: 

java.lang.Exception: TaskManager was lost/killed: 3c0d5310e30f7c52eae95ff97bee85e2 @ grisou-46.nancy.grid5000.fr (dataPort=51359)
	at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
	at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
	at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
	at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
	at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
	at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
	at akka.actor.ActorCell.invoke(ActorCell.scala:486)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Next job submit  after cluster restart produce this ERROR, I guess caused by TaskManager lost:
 


java.lang.IllegalStateException: Update task on TaskManager 12c673eb2681eb4f1d1cb000e561f1c5 @ grisou-48.nancy.grid5000.fr (dataPort=42326) failed due to:
	at org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1076)
	at org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1073)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
	at akka.dispatch.Recover.internal(Future.scala:268)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
	at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
	at scala.util.Try$.apply(Try.scala:192)
	at scala.util.Failure.recover(Try.scala:216)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@grisou-48.nancy.grid5000.fr:49630/user/taskmanager#-1925734821]] after [10000 ms]
	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
	at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
	... 1 more 


Regarding web-monitor there are 0 finished tasks  after DataSource -> Map -> Map -> Map - > FlatMap ->Reduce Chain .  
Also looks like lost/killed Task Manager is not restarting.  

Thank you for any help  on this issue, 

Regards  
Oleksandra