You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ustinov Anton <us...@gmail.com> on 2019/09/02 10:46:42 UTC

TaskManager process continue to work after termination

Hello, I have a standalone cluster setup with Flink 1.8. Task manager processes configured via systemd units with the always restart policy. An error occurred during execution of the JobGraph and caused termination of the task manager. Logs from task manager:

{"time":"2019-09-02 11:33:14.797","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Source: Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e) switched from RUNNING to FAILED.","host":"clickstream-flink08"}
java.lang.RuntimeException: Buffer pool is destroyed.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 22 more
{"time":"2019-09-02 11:33:14.797","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL - exception in resource cleanup of task Source: Custom Source (2/8) (0d0fd38e421b5f2ac389303787ea1f54).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
	at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 11:33:14.801","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Freeing task resources for Source: Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
{"time":"2019-09-02 11:33:14.801","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL - exception in resource cleanup of task Source: Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
	at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskExecutor","message":"FATAL - exception in resource cleanup of task Source: Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
	at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskManagerRunner","message":"Fatal error occurred while executing the TaskManager. Shutting it down...","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
	at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
	at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 11:33:14.809","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Shutting down remote daemon.","host":"clickstream-flink08"}
{"time":"2019-09-02 11:33:14.810","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remote daemon shut down; proceeding with flushing remote transports.","host":"clickstream-flink08"}
{"time":"2019-09-02 11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02 11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02 11:33:14.836","loglevel":"INFO","class":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Stopped Akka RPC service.","host":"clickstream-flink08”}

But task manager process is still alive:

flink     29078  423  7.0 49191076 27790920 ?   Sl   10:13 828:28 java -Djava.net.preferIPv4Stack=true -Dlog.file=/opt/flink/log/flink--taskexecutor-0-clickstream-flink08.log -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml -classpath /opt/flink/lib/flink-cep_2.12-1.8.0.jar:/opt/flink/lib/flink-queryable-state-runtime_2.12-1.8.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.8.0.jar:/opt/flink/lib/flink-table_2.12-1.8.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.8.0.jar::: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf

Is it acceptable behaviour?

Best regards,
Anton Ustinov