You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/26 14:37:57 UTC
[flink] branch master updated: [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c643a2953ba [FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.
c643a2953ba is described below
commit c643a2953ba44b3b316ba52983932329dc0162e4
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Aug 22 17:31:35 2022 +0800
[FLINK-29056] Throw PartitionNotFoundException if the partition file is not readable for hybird shuffle.
This closes #20666
---
.../src/main/java/org/apache/flink/util/IOUtils.java | 17 +++++++++++++++++
.../io/network/partition/hybrid/HsResultPartition.java | 10 ++++++++++
.../network/partition/hybrid/HsResultPartitionTest.java | 15 +++++++++++++++
3 files changed, 42 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 67e971de117..e269d93111d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util;
import org.slf4j.Logger;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -297,6 +298,22 @@ public final class IOUtils {
}
}
+ /** Delete the given directory or file recursively. */
+ public static void deleteFilesRecursively(Path path) throws Exception {
+ File[] files = path.toFile().listFiles();
+ if (files == null || files.length == 0) {
+ return;
+ }
+
+ for (File file : files) {
+ if (!file.isDirectory()) {
+ Files.deleteIfExists(file.toPath());
+ } else {
+ deleteFilesRecursively(file.toPath());
+ }
+ }
+ }
+
/**
* Deletes the given file.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 308742909ea..5d537aa7466 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -43,6 +44,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ScheduledExecutorService;
@@ -175,6 +177,14 @@ public class HsResultPartition extends ResultPartition {
int subpartitionId, BufferAvailabilityListener availabilityListener)
throws IOException {
checkState(!isReleased(), "ResultPartition already released.");
+
+ // If data file is not readable, throw PartitionNotFoundException to mark this result
+ // partition failed. Otherwise, the partition data is not regenerated, so failover can not
+ // recover the job.
+ if (!Files.isReadable(dataFilePath)) {
+ throw new PartitionNotFoundException(getPartitionId());
+ }
+
HsSubpartitionView subpartitionView = new HsSubpartitionView(availabilityListener);
HsDataView diskDataView =
fileDataManager.registerNewSubpartition(subpartitionId, subpartitionView);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index b07bcc7e725..c1086b95f7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.util.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -268,6 +270,19 @@ class HsResultPartitionTest {
.isInstanceOf(IllegalStateException.class);
}
+ @Test
+ void testCreateSubpartitionViewLostData() throws Exception {
+ final int numBuffers = 10;
+ BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
+ HsResultPartition resultPartition = createHsResultPartition(2, bufferPool);
+ IOUtils.deleteFilesRecursively(tempDataPath);
+ assertThatThrownBy(
+ () ->
+ resultPartition.createSubpartitionView(
+ 0, new NoOpBufferAvailablityListener()))
+ .isInstanceOf(PartitionNotFoundException.class);
+ }
+
@Test
void testAvailability() throws Exception {
final int numBuffers = 2;