You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/09 15:46:53 UTC

[flink] 02/02: [FLINK-28623][network] Optimize the use of off-heap memory by blocking and hybrid shuffle reader

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87d4f70e49255b551d0106117978b1aa0747358c
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 01:51:23 2022 +0800

    [FLINK-28623][network] Optimize the use of off-heap memory by blocking and hybrid shuffle reader
    
    Currently, each FileReader (PartitionFileReader or HsSubpartitionFileReaderImpl) will internally allocate a headerBuffer with the size of 8B. Besides, PartitionFileReader also has a 12B indexEntryBuf. Because FileReader is of subpartition granularity, if the parallelism becomes very big, and there are many slots on each TM, the memory occupation will even reach the MB level. In fact, all FileReaders of the same ResultPartition read data in a single thread, so we only need to allocate  [...]
    
    This closes #20333.
---
 .../network/partition/PartitionedFileReader.java   | 23 ++++++++++--------
 .../SortMergeResultPartitionReadScheduler.java     | 26 ++++++++++++++++++--
 .../partition/hybrid/HsFileDataManager.java        |  7 +++++-
 .../partition/hybrid/HsSubpartitionFileReader.java |  4 +++-
 .../hybrid/HsSubpartitionFileReaderImpl.java       | 12 ++++++----
 .../partition/PartitionedFileWriteReadTest.java    | 28 +++++++++++++++++++---
 .../SortMergeResultPartitionReadSchedulerTest.java |  9 ++++++-
 .../partition/SortMergeSubpartitionReaderTest.java |  9 ++++++-
 .../partition/hybrid/HsFileDataManagerTest.java    |  4 +++-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  3 ++-
 10 files changed, 100 insertions(+), 25 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
index b5a43bf6c42..7d268579423 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
@@ -38,8 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** Reader which can read all data of the target subpartition from a {@link PartitionedFile}. */
 class PartitionedFileReader {
 
-    /** Used to read buffers from file channel. */
-    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
+    /** Used to read buffer headers from file channel. */
+    private final ByteBuffer headerBuf;
 
     /** Used to read index entry from index file. */
     private final ByteBuffer indexEntryBuf;
@@ -69,8 +69,9 @@ class PartitionedFileReader {
             PartitionedFile partitionedFile,
             int targetSubpartition,
             FileChannel dataFileChannel,
-            FileChannel indexFileChannel)
-            throws IOException {
+            FileChannel indexFileChannel,
+            ByteBuffer headerBuffer,
+            ByteBuffer indexEntryBuffer) {
         checkArgument(checkNotNull(dataFileChannel).isOpen(), "Data file channel must be opened.");
         checkArgument(
                 checkNotNull(indexFileChannel).isOpen(), "Index file channel must be opened.");
@@ -79,13 +80,11 @@ class PartitionedFileReader {
         this.targetSubpartition = targetSubpartition;
         this.dataFileChannel = dataFileChannel;
         this.indexFileChannel = indexFileChannel;
-
-        this.indexEntryBuf = ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
-        BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf);
-        moveToNextReadableRegion();
+        this.headerBuf = headerBuffer;
+        this.indexEntryBuf = indexEntryBuffer;
     }
 
-    private void moveToNextReadableRegion() throws IOException {
+    private void moveToNextReadableRegion(ByteBuffer indexEntryBuf) throws IOException {
         while (currentRegionRemainingBytes <= 0
                 && nextRegionToRead < partitionedFile.getNumRegions()) {
             partitionedFile.getIndexEntry(
@@ -165,10 +164,14 @@ class PartitionedFileReader {
     }
 
     boolean hasRemaining() throws IOException {
-        moveToNextReadableRegion();
+        moveToNextReadableRegion(indexEntryBuf);
         return currentRegionRemainingBytes > 0;
     }
 
+    void initRegionIndex(ByteBuffer initIndexEntryBuffer) throws IOException {
+        moveToNextReadableRegion(initIndexEntryBuffer);
+    }
+
     /** Gets read priority of this file reader. Smaller value indicates higher priority. */
     long getPriority() {
         return nextOffsetToRead;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 510ea614aa5..1a5bbae4b54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -70,6 +71,17 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
      */
     private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5);
 
+    /** Used to read buffer headers from file channel. */
+    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+    /** Used to read index entry for file reader initializing. */
+    private final ByteBuffer indexEntryBufferInit =
+            ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
+
+    /** Used to read index entry for file reader reading data. */
+    private final ByteBuffer indexEntryBufferRead =
+            ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
+
     /** Lock used to synchronize multi-thread access to thread-unsafe fields. */
     private final Object lock;
 
@@ -144,6 +156,8 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
+        BufferReaderWriterUtil.configureByteBuffer(indexEntryBufferInit);
+        BufferReaderWriterUtil.configureByteBuffer(indexEntryBufferRead);
     }
 
     @Override
@@ -355,8 +369,16 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler
             if (allReaders.isEmpty()) {
                 openFileChannels(resultFile);
             }
-            return new PartitionedFileReader(
-                    resultFile, targetSubpartition, dataFileChannel, indexFileChannel);
+            PartitionedFileReader partitionedFileReader =
+                    new PartitionedFileReader(
+                            resultFile,
+                            targetSubpartition,
+                            dataFileChannel,
+                            indexFileChannel,
+                            headerBuf,
+                            indexEntryBufferRead);
+            partitionedFileReader.initRegionIndex(indexEntryBufferInit);
+            return partitionedFileReader;
         } catch (Throwable throwable) {
             if (allReaders.isEmpty()) {
                 closeFileChannels();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 6070a838110..7e8debb8153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.IOUtils;
 
@@ -32,6 +33,7 @@ import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -93,6 +95,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
 
     private final HybridShuffleConfiguration hybridShuffleConfiguration;
 
+    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
+
     /** All readers waiting to read data of different subpartitions. */
     @GuardedBy("lock")
     private final Set<HsSubpartitionFileReader> allReaders = new HashSet<>();
@@ -160,7 +164,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler {
                             operation,
                             dataIndex,
                             hybridShuffleConfiguration.getMaxBuffersReadAhead(),
-                            this::releaseSubpartitionReader);
+                            this::releaseSubpartitionReader,
+                            headerBuf);
 
             allReaders.add(subpartitionReader);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index fbc044d6da0..2ec2a632281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Queue;
 import java.util.function.Consumer;
@@ -60,6 +61,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
                 int maxBuffersReadAhead,
-                Consumer<HsSubpartitionFileReader> fileReaderReleaser);
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser,
+                ByteBuffer headerBuffer);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index 049eed5f8f6..de02a45582b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
 
-    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
+    private final ByteBuffer headerBuf;
 
     private final int subpartitionId;
 
@@ -75,10 +75,12 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
             HsSubpartitionViewInternalOperations operations,
             HsFileDataIndex dataIndex,
             int maxBufferReadAhead,
-            Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
+            Consumer<HsSubpartitionFileReader> fileReaderReleaser,
+            ByteBuffer headerBuf) {
         this.subpartitionId = subpartitionId;
         this.dataFileChannel = dataFileChannel;
         this.operations = operations;
+        this.headerBuf = headerBuf;
         this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead);
         this.cachedRegionManager = new CachedRegionManager(subpartitionId, dataIndex);
         this.fileReaderReleaser = fileReaderReleaser;
@@ -480,14 +482,16 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader {
                 HsSubpartitionViewInternalOperations operation,
                 HsFileDataIndex dataIndex,
                 int maxBuffersReadAhead,
-                Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
+                Consumer<HsSubpartitionFileReader> fileReaderReleaser,
+                ByteBuffer headerBuffer) {
             return new HsSubpartitionFileReaderImpl(
                     subpartitionId,
                     dataFileChannel,
                     operation,
                     dataIndex,
                     maxBuffersReadAhead,
-                    fileReaderReleaser);
+                    fileReaderReleaser,
+                    headerBuffer);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index 741fd3f1882..cbcaf758b37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -109,7 +110,12 @@ class PartitionedFileWriteReadTest {
         for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
             PartitionedFileReader fileReader =
                     new PartitionedFileReader(
-                            partitionedFile, subpartition, dataFileChannel, indexFileChannel);
+                            partitionedFile,
+                            subpartition,
+                            dataFileChannel,
+                            indexFileChannel,
+                            BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                            createAndConfigIndexEntryBuffer());
             while (fileReader.hasRemaining()) {
                 final int subIndex = subpartition;
                 fileReader.readCurrentRegion(
@@ -187,7 +193,12 @@ class PartitionedFileWriteReadTest {
         for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
             PartitionedFileReader fileReader =
                     new PartitionedFileReader(
-                            partitionedFile, subpartition, dataFileChannel, indexFileChannel);
+                            partitionedFile,
+                            subpartition,
+                            dataFileChannel,
+                            indexFileChannel,
+                            BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                            createAndConfigIndexEntryBuffer());
             int bufferIndex = 0;
             while (fileReader.hasRemaining()) {
                 final int subIndex = subpartition;
@@ -280,7 +291,12 @@ class PartitionedFileWriteReadTest {
         FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
         PartitionedFileReader partitionedFileReader =
                 new PartitionedFileReader(
-                        partitionedFile, targetSubpartition, dataFileChannel, indexFileChannel);
+                        partitionedFile,
+                        1,
+                        dataFileChannel,
+                        indexFileChannel,
+                        BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                        createAndConfigIndexEntryBuffer());
 
         partitionedFileReader.readCurrentRegion(
                 allocateBuffers(bufferSize),
@@ -313,4 +329,10 @@ class PartitionedFileWriteReadTest {
         partitionedFileWriter.finish();
         return partitionedFileWriter;
     }
+
+    public static ByteBuffer createAndConfigIndexEntryBuffer() {
+        ByteBuffer indexEntryBuffer = ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
+        BufferReaderWriterUtil.configureByteBuffer(indexEntryBuffer);
+        return indexEntryBuffer;
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 26213d4d43d..a865f6c4915 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -91,7 +92,13 @@ class SortMergeResultPartitionReadSchedulerTest {
         dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
         indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
         fileReader =
-                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
+                new PartitionedFileReader(
+                        partitionedFile,
+                        0,
+                        dataFileChannel,
+                        indexFileChannel,
+                        BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                        createAndConfigIndexEntryBuffer());
         bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
         executor = Executors.newFixedThreadPool(numThreads);
         readScheduler =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index f899a0ea142..9552bb457c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -39,6 +39,7 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Random;
 
+import static org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -236,7 +237,13 @@ class SortMergeSubpartitionReaderTest {
     private SortMergeSubpartitionReader createSortMergeSubpartitionReader(
             BufferAvailabilityListener listener) throws Exception {
         PartitionedFileReader fileReader =
-                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel);
+                new PartitionedFileReader(
+                        partitionedFile,
+                        0,
+                        dataFileChannel,
+                        indexFileChannel,
+                        BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                        createAndConfigIndexEntryBuffer());
         assertThat(fileReader.hasRemaining()).isTrue();
         return new SortMergeSubpartitionReader(listener, fileReader);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 1a2e28959a6..4947f5e7776 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -431,7 +432,8 @@ class HsFileDataManagerTest {
                     HsSubpartitionViewInternalOperations operation,
                     HsFileDataIndex dataIndex,
                     int maxBuffersReadAhead,
-                    Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
+                    Consumer<HsSubpartitionFileReader> fileReaderReleaser,
+                    ByteBuffer headerBuffer) {
                 return checkNotNull(allReaders.poll());
             }
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 5782d5a510c..fcc51dbfa7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -501,7 +501,8 @@ class HsSubpartitionFileReaderImplTest {
                 operations,
                 diskIndex,
                 MAX_BUFFERS_READ_AHEAD,
-                (ignore) -> {});
+                (ignore) -> {},
+                BufferReaderWriterUtil.allocatedHeaderBuffer());
     }
 
     private static FileChannel openFileChannel(Path path) throws IOException {