You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/04 02:54:24 UTC

[flink] branch release-1.13 updated: [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ca8dbcd  [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
ca8dbcd is described below

commit ca8dbcdcadd5262645761c340a89c86bfce7446f
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Wed Jan 26 11:06:46 2022 +0800

    [FLINK-21788][network] Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
    
    Currently, if the partition file has been lost for blocking shuffle, FileNotFoundException will be thrown and the partition data will not be regenerated. This change makes it throw PartitionNotFoundException instead.
    
    This closes #18515.
---
 .../partition/BoundedBlockingSubpartition.java     |  5 ++
 .../io/network/partition/PartitionedFile.java      |  5 ++
 .../partition/SortMergeResultPartition.java        |  4 +
 .../flink/test/runtime/BlockingShuffleITCase.java  | 94 +++++++++++++++++++---
 4 files changed, 95 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 4656c75..4a22f1d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -213,6 +214,10 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
             checkState(!isReleased, "data partition already released");
             checkState(isFinished, "writing of blocking partition not yet finished");
 
+            if (!Files.isReadable(data.getFilePath())) {
+                throw new PartitionNotFoundException(parent.getPartitionId());
+            }
+
             final ResultSubpartitionView reader;
             if (useDirectFileTransfer) {
                 reader =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index dd34fe0..9f6018c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
 import java.nio.file.Path;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -109,6 +110,10 @@ public class PartitionedFile {
         return numRegions;
     }
 
+    public boolean isReadable() {
+        return Files.isReadable(dataFilePath) && Files.isReadable(indexFilePath);
+    }
+
     /**
      * Returns the index entry offset of the target region and subpartition in the index file. Both
      * region index and subpartition index start from 0.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 9382503..6335ddb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -442,6 +442,10 @@ public class SortMergeResultPartition extends ResultPartition {
             checkState(!isReleased(), "Partition released.");
             checkState(isFinished(), "Trying to read unfinished blocking partition.");
 
+            if (!resultFile.isReadable()) {
+                throw new PartitionNotFoundException(getPartitionId());
+            }
+
             return readScheduler.crateSubpartitionReader(
                     availabilityListener, subpartitionIndex, resultFile);
         }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index c1df574..9eddf7a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -19,19 +19,27 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
@@ -44,53 +52,82 @@ public class BlockingShuffleITCase {
 
     private final int numSlotsPerTaskManager = 4;
 
+    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
-        JobGraph jobGraph = createJobGraph(1000000);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(1000000, false);
+        Configuration configuration = getConfiguration();
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(0, false);
+        Configuration configuration = getConfiguration();
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
 
     @Test
     public void testSortMergeBlockingShuffle() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
 
-        JobGraph jobGraph = createJobGraph(1000000);
+        JobGraph jobGraph = createJobGraph(1000000, false);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
 
     @Test
     public void testSortMergeBlockingShuffleWithoutData() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
 
-        JobGraph jobGraph = createJobGraph(0);
+        JobGraph jobGraph = createJobGraph(0, false);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+    }
+
+    @Test
+    public void testDeletePartitionFileOfBoundedBlockingShuffle() throws Exception {
+        Configuration configuration = getConfiguration();
+        JobGraph jobGraph = createJobGraph(0, true);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
 
-    private JobGraph createJobGraph(int numRecordsToSend) {
+    @Test
+    public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
+        Configuration configuration = getConfiguration();
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+
+        JobGraph jobGraph = createJobGraph(0, true);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+    }
+
+    private Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.TMP_DIRS, TEMP_FOLDER.getRoot().getAbsolutePath());
+        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+        return configuration;
+    }
+
+    private JobGraph createJobGraph(int numRecordsToSend, boolean deletePartitionFile) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
         env.setBufferTimeout(-1);
         env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
         DataStream<String> source = env.addSource(new StringSource(numRecordsToSend));
         source.rebalance()
                 .map((MapFunction<String, String>) value -> value)
                 .broadcast()
-                .addSink(new VerifySink());
+                .addSink(new VerifySink(deletePartitionFile));
 
         StreamGraph streamGraph = env.getStreamGraph();
         streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
@@ -122,11 +159,42 @@ public class BlockingShuffleITCase {
         }
     }
 
-    private static class VerifySink implements SinkFunction<String> {
+    private static class VerifySink extends RichSinkFunction<String> {
+        private final boolean deletePartitionFile;
+
+        VerifySink(boolean deletePartitionFile) {
+            this.deletePartitionFile = deletePartitionFile;
+        }
 
         @Override
-        public void invoke(String value) throws Exception {
+        public void open(Configuration parameters) throws Exception {
+            if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() > 0) {
+                return;
+            }
+
+            synchronized (BlockingShuffleITCase.class) {
+                deleteFiles(TEMP_FOLDER.getRoot());
+            }
+        }
+
+        @Override
+        public void invoke(String value, Context context) throws Exception {
             assertEquals(RECORD, value);
         }
+
+        private static void deleteFiles(File root) throws IOException {
+            File[] files = root.listFiles();
+            if (files == null || files.length == 0) {
+                return;
+            }
+
+            for (File file : files) {
+                if (!file.isDirectory()) {
+                    Files.deleteIfExists(file.toPath());
+                } else {
+                    deleteFiles(file);
+                }
+            }
+        }
     }
 }