You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/02 01:57:17 UTC

[flink] 02/13: [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42f05f0cddd45b4ec54f1e7c52796e5e26cfbf6f
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 28 17:19:36 2022 +0800

    [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException.
---
 .../io/network/partition/hybrid/HsMemoryDataManager.java  | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index b58338b6912..0f0744fe9ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
@@ -222,13 +223,13 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
                     // decrease numUnSpillBuffers as this subpartition's buffer is spill.
                     numUnSpillBuffers.getAndAdd(-bufferIndexAndChannels.size());
                 });
-
-        spiller.spillAsync(bufferWithIdentities)
-                .thenAccept(
-                        spilledBuffers -> {
-                            fileDataIndex.addBuffers(spilledBuffers);
-                            spillingCompleteFuture.complete(null);
-                        });
+        FutureUtils.assertNoException(
+                spiller.spillAsync(bufferWithIdentities)
+                        .thenAccept(
+                                spilledBuffers -> {
+                                    fileDataIndex.addBuffers(spilledBuffers);
+                                    spillingCompleteFuture.complete(null);
+                                }));
     }
 
     /**