You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by VINAYA KUMAR BENDI <vi...@man-es.com> on 2021/03/25 06:27:21 UTC
Flink job repeated restart failure
Dear all,
One of the Flink jobs gave below exception and failed. Several attempts to restart the job resulted in the same exception and the job failed each time. The job started successfully only after changing the file name.
Flink Version: 1.11.2
Exception
2021-03-24 20:13:09,288 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-03-24 20:13:09,288 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-2] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
2021-03-24 20:13:09,304 WARN org.apache.flink.runtime.taskmanager.Task [] - Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.
java.io.IOException: No such file or directory
at java.io.UnixFileSystem.createFileExclusively(Native Method) ~[?:1.8.0_252]
at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]
at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.12-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
2021-03-24 20:13:09,305 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31).
2021-03-24 20:13:09,311 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) 8905142514cb25adbd42980680562d31.
File: https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
Related Jira ID: https://issues.apache.org/jira/browse/FLINK-18811
Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0. Though in our case, we didn't notice any disk failure. Is there any other reason(s) for the above mentioned IOException?
While we are planning to upgrade to the latest Flink version, are there any other workaround(s) instead of deploying the job again with a different file name?
Kind regards,
Vinaya
Re: Flink job repeated restart failure
Posted by Arvid Heise <ar...@apache.org>.
Hi Vinaya,
java.io.tmpdir is already the fallback and I'm not aware of another level
of fallback.
Ensuring java.io.tmpdir is valid is also relevant for some third-party
libraries that rely on it (e.g. FileSystem that cache local files). It's
good practice to set that appropriately.
On Fri, Mar 26, 2021 at 6:32 AM vinaya <vi...@man-es.com> wrote:
> Hi Arvid,
>
> Thank you for the suggestion.
>
> Indeed, the specified setting was commented out in the Flink configuration
> (flink-conf.yaml).
>
> # io.tmp.dirs: /tmp
>
> Is there a fallback (e.g. /tmp) if io.tmp.dirs and
> System.getProperty("java.io.tmpdir") are both not set?
>
> Will configure this setting to a valid value as suggested.
>
> Kind regards,
> Vinaya
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
Re: Flink job repeated restart failure
Posted by vinaya <vi...@man-es.com>.
Hi Arvid,
Thank you for the suggestion.
Indeed, the specified setting was commented out in the Flink configuration
(flink-conf.yaml).
# io.tmp.dirs: /tmp
Is there a fallback (e.g. /tmp) if io.tmp.dirs and
System.getProperty("java.io.tmpdir") are both not set?
Will configure this setting to a valid value as suggested.
Kind regards,
Vinaya
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink job repeated restart failure
Posted by Arvid Heise <ar...@apache.org>.
Hi Vinaya,
SpillingAdaptiveSpanningRecordDeserializer tries to create a directory in
the temp directory, which you can configure by setting io.tmp.dirs. By
default, it's set to System.getProperty("java.io.tmpdir"), which seems to
be invalid in your case. (Note that the directory has to exist on the task
managers)
Best,
Arvid
On Thu, Mar 25, 2021 at 7:27 AM VINAYA KUMAR BENDI <vi...@man-es.com>
wrote:
> Dear all,
>
>
>
> One of the Flink jobs gave below exception and failed. Several attempts to
> restart the job resulted in the same exception and the job failed each
> time. The job started successfully only after changing the file name.
>
>
>
> *Flink Version*: 1.11.2
>
>
>
> *Exception*
>
> 2021-03-24 20:13:09,288 INFO
> org.apache.kafka.clients.producer.KafkaProducer [] - [Producer
> clientId=producer-2] Closing the Kafka producer with timeoutMillis = 0 ms.
>
> 2021-03-24 20:13:09,288 INFO
> org.apache.kafka.clients.producer.KafkaProducer [] - [Producer
> clientId=producer-2] Proceeding to force close the producer since pending
> requests could not be completed within timeout 0 ms.
>
> 2021-03-24 20:13:09,304 WARN
> org.apache.flink.runtime.taskmanager.Task [] - Flat Map
> -> async wait operator -> Process -> Sink: Unnamed (1/1)
> (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.
>
> java.io.IOException: No such file or directory
>
> at java.io.UnixFileSystem.createFileExclusively(Native Method)
> ~[?:1.8.0_252]
>
> at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>
> 2021-03-24 20:13:09,305 INFO
> org.apache.flink.runtime.taskmanager.Task [] - Freeing
> task resources for Flat Map -> async wait operator -> Process -> Sink:
> Unnamed (1/1) (8905142514cb25adbd42980680562d31).
>
> 2021-03-24 20:13:09,311 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1)
> 8905142514cb25adbd42980680562d31.
>
>
>
> *File*:
> https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
>
>
>
> *Related Jira ID*: https://issues.apache.org/jira/browse/FLINK-18811
>
>
>
> Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0.
> Though in our case, we didn’t notice any disk failure. Is there any other
> reason(s) for the above mentioned IOException?
>
>
>
> While we are planning to upgrade to the latest Flink version, are there
> any other workaround(s) instead of deploying the job again with a different
> file name?
>
>
>
> Kind regards,
>
> Vinaya
>