You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/07/26 01:50:16 UTC

[flink] 03/04: [hotfix] HsMemoryDataSpiller's spilling thread will trigger fatal error when an exception is encountered.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0610e16ae17ae94c1ec104906945fc86ad3addeb
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 21 22:12:55 2022 +0800

    [hotfix] HsMemoryDataSpiller's spilling thread will trigger fatal error when an exception is encountered.
---
 .../partition/hybrid/HsMemoryDataSpiller.java       | 17 ++++++++++++++---
 .../partition/hybrid/HsMemoryDataSpillerTest.java   | 21 ---------------------
 2 files changed, 14 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
index e0909c53b32..dd225ba6b27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -41,7 +43,15 @@ public class HsMemoryDataSpiller implements AutoCloseable {
     /** One thread to perform spill operation. */
     private final ExecutorService ioExecutor =
             Executors.newSingleThreadScheduledExecutor(
-                    new ThreadFactoryBuilder().setNameFormat("hybrid spiller thread").build());
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("hybrid spiller thread")
+                            // It is more appropriate to use task fail over than exit JVM here,
+                            // but the task thread will bring some extra overhead to check the
+                            // exception information set by other thread. As the spiller thread will
+                            // not encounter exceptions in most cases, we temporarily choose the
+                            // form of fatal error to deal except thrown by spiller thread.
+                            .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
 
     /** File channel to write data. */
     private final FileChannel dataFileChannel;
@@ -81,8 +91,9 @@ public class HsMemoryDataSpiller implements AutoCloseable {
             // which controls data's life cycle.
             // TODO update file data index and handle buffers release in future ticket.
             spilledFuture.complete(spilledBuffers);
-        } catch (Throwable t) {
-            spilledFuture.completeExceptionally(t);
+        } catch (IOException exception) {
+            // if spilling is failed, throw exception directly to uncaughtExceptionHandler.
+            ExceptionUtils.rethrow(exception);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
index 993887c6ed3..b31f522f799 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
@@ -33,10 +33,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -46,7 +44,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -74,24 +71,6 @@ class HsMemoryDataSpillerTest {
         this.memoryDataSpiller = new HsMemoryDataSpiller(dataFileChannel);
     }
 
-    @Test
-    void testSpillExceptionally() throws IOException {
-        int targetChannel = 0;
-        List<BufferWithIdentity> bufferWithIdentityList =
-                createBufferWithIdentityList(
-                        targetChannel,
-                        Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2)));
-        // close data file channel to trigger exception when spill data into disk.
-        dataFileChannel.close();
-
-        CompletableFuture<List<SpilledBuffer>> future =
-                memoryDataSpiller.spillAsync(bufferWithIdentityList);
-        assertThat(future)
-                .failsWithin(60, TimeUnit.SECONDS)
-                .withThrowableOfType(ExecutionException.class)
-                .withCauseInstanceOf(ClosedChannelException.class);
-    }
-
     @Test
     void testSpillSuccessfully() throws Exception {
         List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();