You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/07/26 01:50:16 UTC
[flink] 03/04: [hotfix] HsMemoryDataSpiller's spilling thread will trigger fatal error when an exception is encountered.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0610e16ae17ae94c1ec104906945fc86ad3addeb
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 21 22:12:55 2022 +0800
[hotfix] HsMemoryDataSpiller's spilling thread will trigger fatal error when an exception is encountered.
---
.../partition/hybrid/HsMemoryDataSpiller.java | 17 ++++++++++++++---
.../partition/hybrid/HsMemoryDataSpillerTest.java | 21 ---------------------
2 files changed, 14 insertions(+), 24 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
index e0909c53b32..dd225ba6b27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -41,7 +43,15 @@ public class HsMemoryDataSpiller implements AutoCloseable {
/** One thread to perform spill operation. */
private final ExecutorService ioExecutor =
Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("hybrid spiller thread").build());
+ new ThreadFactoryBuilder()
+ .setNameFormat("hybrid spiller thread")
+ // It is more appropriate to use task fail over than exit JVM here,
+ // but the task thread will bring some extra overhead to check the
+ // exception information set by other thread. As the spiller thread will
+ // not encounter exceptions in most cases, we temporarily choose the
+ // form of fatal error to deal except thrown by spiller thread.
+ .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+ .build());
/** File channel to write data. */
private final FileChannel dataFileChannel;
@@ -81,8 +91,9 @@ public class HsMemoryDataSpiller implements AutoCloseable {
// which controls data's life cycle.
// TODO update file data index and handle buffers release in future ticket.
spilledFuture.complete(spilledBuffers);
- } catch (Throwable t) {
- spilledFuture.completeExceptionally(t);
+ } catch (IOException exception) {
+ // if spilling is failed, throw exception directly to uncaughtExceptionHandler.
+ ExceptionUtils.rethrow(exception);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
index 993887c6ed3..b31f522f799 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
@@ -33,10 +33,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -46,7 +44,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -74,24 +71,6 @@ class HsMemoryDataSpillerTest {
this.memoryDataSpiller = new HsMemoryDataSpiller(dataFileChannel);
}
- @Test
- void testSpillExceptionally() throws IOException {
- int targetChannel = 0;
- List<BufferWithIdentity> bufferWithIdentityList =
- createBufferWithIdentityList(
- targetChannel,
- Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2)));
- // close data file channel to trigger exception when spill data into disk.
- dataFileChannel.close();
-
- CompletableFuture<List<SpilledBuffer>> future =
- memoryDataSpiller.spillAsync(bufferWithIdentityList);
- assertThat(future)
- .failsWithin(60, TimeUnit.SECONDS)
- .withThrowableOfType(ExecutionException.class)
- .withCauseInstanceOf(ClosedChannelException.class);
- }
-
@Test
void testSpillSuccessfully() throws Exception {
List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();