You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2020/10/28 07:24:00 UTC

[jira] [Closed] (FLINK-18811) if a disk is damaged, taskmanager should choose another disk for temp dir , rather than throw an IOException, which causes flink job restart over and over again

     [ https://issues.apache.org/jira/browse/FLINK-18811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Piotr Nowojski closed FLINK-18811.
----------------------------------
    Fix Version/s: 1.12.0
       Resolution: Fixed

merged commit e38716f into apache:master

> if a disk is damaged, taskmanager should choose another disk for temp dir , rather than throw an IOException, which causes flink job restart over and over again
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18811
>                 URL: https://issues.apache.org/jira/browse/FLINK-18811
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>         Environment: flink-1.10
>            Reporter: Kai Chen
>            Assignee: Kai Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>         Attachments: flink_disk_error.png
>
>
> I met this Exception when a hard disk was damaged:
> !flink_disk_error.png!
> I checked the code and found that flink will create a temp file  when Record length > 5 MB:
>  
> {code:java}
> // SpillingAdaptiveSpanningRecordDeserializer.java
> if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
>    // create a spilling channel and put the data there
>    this.spillingChannel = createSpillingChannel();
>    ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
>    FileUtils.writeCompletely(this.spillingChannel, toWrite);
> }
> {code}
> The tempDir is random picked from all `tempDirs`. Well on yarn mode, one `tempDir`  usually represents one hard disk.
>  
> In may opinion, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again.
> If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this:
> {code:java}
> // SpillingAdaptiveSpanningRecordDeserializer.java
> private FileChannel createSpillingChannel() throws IOException {
>    if (spillFile != null) {
>       throw new IllegalStateException("Spilling file already exists.");
>    }
>    // try to find a unique file name for the spilling channel
>    int maxAttempts = 10;
>    String[] tempDirs = this.tempDirs;
>    for (int attempt = 0; attempt < maxAttempts; attempt++) {
>       int dirIndex = rnd.nextInt(tempDirs.length);
>       String directory = tempDirs[dirIndex];
>       spillFile = new File(directory, randomString(rnd) + ".inputchannel");
>       try {
>          if (spillFile.createNewFile()) {
>             return new RandomAccessFile(spillFile, "rw").getChannel();
>          }
>       } catch (IOException e) {
>          // if there is no tempDir left to try
>          if(tempDirs.length <= 1) {
>             throw e;
>          }
>          LOG.warn("Caught an IOException when creating spill file: " + directory + ". Attempt " + attempt, e);
>          tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex);
>       }
>    }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)