You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/26 14:37:57 UTC

[flink] branch master updated: [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c643a2953ba [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.
c643a2953ba is described below

commit c643a2953ba44b3b316ba52983932329dc0162e4
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Aug 22 17:31:35 2022 +0800

    [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.
    
    This closes #20666
---
 .../src/main/java/org/apache/flink/util/IOUtils.java    | 17 +++++++++++++++++
 .../io/network/partition/hybrid/HsResultPartition.java  | 10 ++++++++++
 .../network/partition/hybrid/HsResultPartitionTest.java | 15 +++++++++++++++
 3 files changed, 42 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 67e971de117..e269d93111d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util;
 
 import org.slf4j.Logger;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -297,6 +298,22 @@ public final class IOUtils {
         }
     }
 
+    /** Delete the given directory or file recursively. */
+    public static void deleteFilesRecursively(Path path) throws Exception {
+        File[] files = path.toFile().listFiles();
+        if (files == null || files.length == 0) {
+            return;
+        }
+
+        for (File file : files) {
+            if (!file.isDirectory()) {
+                Files.deleteIfExists(file.toPath());
+            } else {
+                deleteFilesRecursively(file.toPath());
+            }
+        }
+    }
+
     /**
      * Deletes the given file.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 308742909ea..5d537aa7466 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -43,6 +44,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -175,6 +177,14 @@ public class HsResultPartition extends ResultPartition {
             int subpartitionId, BufferAvailabilityListener availabilityListener)
             throws IOException {
         checkState(!isReleased(), "ResultPartition already released.");
+
+        // If data file is not readable, throw PartitionNotFoundException to mark this result
+        // partition failed. Otherwise, the partition data is not regenerated, so failover can not
+        // recover the job.
+        if (!Files.isReadable(dataFilePath)) {
+            throw new PartitionNotFoundException(getPartitionId());
+        }
+
         HsSubpartitionView subpartitionView = new HsSubpartitionView(availabilityListener);
         HsDataView diskDataView =
                 fileDataManager.registerNewSubpartition(subpartitionId, subpartitionView);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index b07bcc7e725..c1086b95f7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.util.IOUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -268,6 +270,19 @@ class HsResultPartitionTest {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testCreateSubpartitionViewLostData() throws Exception {
+        final int numBuffers = 10;
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+        HsResultPartition resultPartition = createHsResultPartition(2, bufferPool);
+        IOUtils.deleteFilesRecursively(tempDataPath);
+        assertThatThrownBy(
+                        () ->
+                                resultPartition.createSubpartitionView(
+                                        0, new NoOpBufferAvailablityListener()))
+                .isInstanceOf(PartitionNotFoundException.class);
+    }
+
     @Test
     void testAvailability() throws Exception {
         final int numBuffers = 2;