You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ShB <sh...@gmail.com> on 2017/10/12 21:11:58 UTC

Re: Task Manager was lost/killed due to full GC

Hi Stephan,

Thanks for your response!

Task manager lost/killed has been a recurring problem I've had with Flink
for the last few months, as I try to scale to larger and larger amounts of
data. I would be very grateful for some help figuring out how I can avoid
this. 

The program is set up something like this:
/
DataSet<CustomType> data = env.fromCollection(listOfFiles)
.rebalance()
.flatMap(new ReadFiles())						
.filter(new FilterData());

DataSet<Tuple8> computation1 = data
.map(new Compute1())
.distinct()
.map(new Compute2())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple10> computation2 = data
.map(new Compute3())
.distinct()
.map(new Compute4())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple12> finalOP = computation1.join(computation2)
						.where(0, 1)
						.equalTo(0, 1)
						.with(new Join1())
						.sortPartition(0, Order.ASCENDING)
						.setParallelism(1);

finalOP.writeAsCsv("s3://myBucket/myKey.csv");

---

public static final class ReadFiles implements FlatMapFunction<String,
CustomType> {
		@Override
		public void flatMap(String fileName, Collector<CustomType> out) throws
Exception {

			S3FileReaderAndParser parser = new S3FileReaderAndParser(fileName);
			List<CustomType> dataList = parser.parseFiles();
			for (CustomType data : dataList) {
				out.collect(data);
			}
		}
	}
/

Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a
flatmap function that reads each of the files from S3 using the AWS S3 Java
SDK and parses and emits each of the protobufs. 

And yes, I can find a message like this in the logs about "gated" systems:
2017-10-12 20:46:00,355 WARN  akka.remote.ReliableDeliverySupervisor                       
- Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has
failed, address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused:
ip-172-31-8-29/172.31.8.29:38763]

Thank you!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Task Manager was lost/killed due to full GC

Posted by ShB <sh...@gmail.com>.
On further investigation, seems to me the I/O exception I posted previously
is not the cause of the TM being lost. it's the after effect of the TM being
shut down and the channel being closed after a record is emitted but before
it's processed.

Previously, the logs didn't throw up this error and I'm also unable to
reproduce it each time(I've come across the I/O exception twice so far).
Most of the time, the logs don't have the I/O or any other exception/error
messages. 

This is what the logs usually(without the I/O exception) look like:
Job Manager:
/
2017-10-12 22:22:41,857 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Container container_1507845873691_0001_01_000008 failed. Exit status: -100
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Diagnostics for container container_1507845873691_0001_01_000008 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Total number of failed containers so far: 1
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 22:22:42,096 INFO  org.apache.flink.yarn.YarnJobManager                         
- Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager
terminated.
2017-10-12 22:22:42,210 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
	at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
	at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
	at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
	at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
	at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
	at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,451 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_000008 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
	at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
	at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
	at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
	at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
	at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
	at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,907 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (1/96) (8cf2869e9786809d1b9b9d12b9467e40)
switched from RUNNING to CANCELING.
/
 
Task Manager:
/
2017-10-12 22:22:38,570 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 5973/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3150, Total Capacity: 17138363, Used Memory: 17138364
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,631 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 6101/17600/17600 MB, NON HEAP: 72/73/-1 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 3234, Total Capacity: 17139035, Used Memory: 17139036
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace:
47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 22:22:38,691 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369],
[G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2]
*2017-10-12 22:22:38,709 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                  
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 22:22:38,713 INFO  org.apache.flink.runtime.blob.BlobCache                      
- Shutting down BlobCache
2017-10-12 22:22:38,719 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-a5aace05-73ed-4cea-ad07-db86f9f8ce21
2017-10-12 22:22:38,719 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt1/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-cb34ffbe-879f-47d4-9df3-6ed2b0dcd799
/





This is what the logs sometimes(with the I/O exception) look like:
Job Manager:
/
2017-10-12 19:40:37,669 WARN  akka.remote.ReliableDeliverySupervisor                       
- Association with remote system [akka.tcp://flink@ip-172-31-11-129:43340]
has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Container container_1507836035753_0001_01_000015 failed. Exit status: -100
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Diagnostics for container container_1507836035753_0001_01_000015 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,922 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Total number of failed containers so far: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Container container_1507836035753_0001_01_000013 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Diagnostics for container container_1507836035753_0001_01_000013 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Total number of failed containers so far: 2
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Container container_1507836035753_0001_01_000002 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Diagnostics for container container_1507836035753_0001_01_000002 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Total number of failed containers so far: 3
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Container container_1507836035753_0001_01_000003 failed. Exit status: -100
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Diagnostics for container container_1507836035753_0001_01_000003 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Total number of failed containers so far: 4
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 19:40:37,923 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 2
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 3
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnJobManager                         
- Task manager akka.tcp://flink@ip-172-31-1-178:33620/user/taskmanager
terminated.
2017-10-12 19:40:37,924 INFO  org.apache.flink.yarn.YarnFlinkResourceManager               
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 4
2017-10-12 19:40:37,925 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:87)) (40/136) (748d815623ff13e6357f351d5aa7b0f4)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
	at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
	at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
	at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
	at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
	at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
	at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 19:40:37,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Output
(11771c44eace0a1e32de1c3ca1c60b09) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507836035753_0001_01_000015 @
ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861)
	at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
	at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
	at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
	at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
	at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
	at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
/

Task Manager:
/
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1347/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 162, Total Capacity: 17111387, Used Memory: 17111388
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:34,959 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 1467/17600/17600 MB, NON HEAP: 73/74/-1 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 196, Total Capacity: 17111659, Used Memory: 17111660
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace:
48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB
(used/committed/max)]
2017-10-12 19:40:35,019 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363],
[G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2]
*2017-10-12 19:40:35,033 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                  
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.*
2017-10-12 19:40:35,043 ERROR org.apache.flink.runtime.operators.BatchTask                 
- Error in task code:  CHAIN Partition -> FlatMap (FlatMap at
main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87))
(86/136)
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
	at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
	at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
	at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
	at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
	at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
	at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
	at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
	at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
	... 13 more
2017-10-12 19:40:35,050 INFO 
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
removed spill file directory
/mnt/yarn/usercache/hadoop/appcache/application_1507836035753_0001/flink-io-81d98a3a-7a40-438f-93fa-3b1f9dfc1e1d
/

I still can't figure out why the TM shuts down and how to avoid this at all
- seems like a memory/GC issue. I was able to have the job complete
previously by increasing parallelism(number of task managers). But as my
dataset size has increases, I'm running into this issue again and increasing
parallelism is not working. 

Any help would be greatly appreciated! 

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Task Manager was lost/killed due to full GC

Posted by ShB <sh...@gmail.com>.
Hi Stephan,

Apologies, I hit send too soon on the last email. 

So, while trying to debug this, I ran it multiple times on different
instance types(to increase RAM available) and while digging into the logs, I
found this to be the error in the task manager logs:

/
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
	at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
	at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
	at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
	at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
	at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
	at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
	at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
	at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
	at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
	at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
	at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
	... 13 more
/

Any idea on a fix for this issue? I can't seem to find any further
information on this in the mailing lists.

Thank you.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Task Manager was lost/killed due to full GC

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for the heads-up and explaining how you resolve the issue!

Best, Fabian

2017-10-18 3:50 GMT+02:00 ShB <sh...@gmail.com>:

> I just wanted to leave an update about this issue, for someone else who
> might
> come across it. The problem was with memory, but it was disk memory and not
> heap/off-heap memory. Yarn was killing off my containers as they exceeded
> the threshold for disk utilization and this was manifesting as Task manager
> was lost/killed or JobClientActorConnectionTimeoutException: Lost
> connection
> to the JobManager. Digging deep into the individual instance node manager
> logs provided some hints about it being a disk issue.
>
> Some fixes for this problem:
> yarn.nodemanager.disk-health-checker.max-disk-utilization-
> per-disk-percentage
> -- can be increased to alleviate the problem temporarily.
> Increasing the disk capacity on each task manager is a more long-term fix.
> Increasing the number of task managers increases available disk memory and
> hence is also a fix.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Task Manager was lost/killed due to full GC

Posted by ShB <sh...@gmail.com>.
I just wanted to leave an update about this issue, for someone else who might
come across it. The problem was with memory, but it was disk memory and not
heap/off-heap memory. Yarn was killing off my containers as they exceeded
the threshold for disk utilization and this was manifesting as Task manager
was lost/killed or JobClientActorConnectionTimeoutException: Lost connection
to the JobManager. Digging deep into the individual instance node manager
logs provided some hints about it being a disk issue. 

Some fixes for this problem:
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
-- can be increased to alleviate the problem temporarily. 
Increasing the disk capacity on each task manager is a more long-term fix. 
Increasing the number of task managers increases available disk memory and
hence is also a fix.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/