You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/05 05:04:25 UTC

[flink] 02/02: [FLINK-28781] Hybrid Shuffle should support compression.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2edc43c6e469a3e16ee01a1373cf52523eb96b01
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 01:26:47 2022 +0800

    [FLINK-28781] Hybrid Shuffle should support compression.
    
    Compression is a useful feature for batch jobs, which can significantly reduce disk load and the amount of data transferred over the network. Hybrid shuffle should also support the compression of spilled data, especially under the full spilling strategy.
    
    This closes #20426
---
 .../network/partition/ResultPartitionFactory.java  |  3 +-
 .../io/network/partition/ResultPartitionType.java  |  4 ++
 .../partition/consumer/SingleInputGateFactory.java |  2 +-
 .../partition/hybrid/HsMemoryDataManager.java      |  6 +-
 .../partition/hybrid/HsMemoryDataSpiller.java      | 35 ++++++++++-
 .../partition/hybrid/HsResultPartition.java        |  3 +-
 .../partition/hybrid/HsMemoryDataManagerTest.java  |  6 +-
 .../partition/hybrid/HsMemoryDataSpillerTest.java  | 40 ++++++++++---
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 67 ++++++++++++++++++----
 9 files changed, 137 insertions(+), 29 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 73171089203..94cbf004109 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -139,8 +139,7 @@ public class ResultPartitionFactory {
             int maxParallelism,
             SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
         BufferCompressor bufferCompressor = null;
-        if (type.isBlockingOrBlockingPersistentResultPartition()
-                && batchShuffleCompressionEnabled) {
+        if (type.supportCompression() && batchShuffleCompressionEnabled) {
             bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index ee341f50535..c82e55a8321 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -196,4 +196,8 @@ public enum ResultPartitionType {
     public boolean isPersistent() {
         return isPersistent;
     }
+
+    public boolean supportCompression() {
+        return isBlockingOrBlockingPersistentResultPartition() || this == HYBRID;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index 89091fcca7a..d7d4b0108c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -121,7 +121,7 @@ public class SingleInputGateFactory {
                 createBufferPoolFactory(networkBufferPool, floatingNetworkBuffersPerGate);
 
         BufferDecompressor bufferDecompressor = null;
-        if (igdd.getConsumedPartitionType().isBlockingOrBlockingPersistentResultPartition()
+        if (igdd.getConsumedPartitionType().supportCompression()
                 && batchShuffleCompressionEnabled) {
             bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 11228fe7206..b887a157cfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -76,11 +77,12 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData
             BufferPool bufferPool,
             HsSpillingStrategy spillStrategy,
             HsFileDataIndex fileDataIndex,
-            Path dataFilePath)
+            Path dataFilePath,
+            BufferCompressor bufferCompressor)
             throws IOException {
         this.numSubpartitions = numSubpartitions;
         this.bufferPool = bufferPool;
-        this.spiller = new HsMemoryDataSpiller(dataFilePath);
+        this.spiller = new HsMemoryDataSpiller(dataFilePath, bufferCompressor);
         this.spillStrategy = spillStrategy;
         this.fileDataIndex = fileDataIndex;
         this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[numSubpartitions];
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
index e2f4cb06aba..b6a8a9b67c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
 import org.apache.flink.util.ExceptionUtils;
@@ -26,6 +27,8 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -38,6 +41,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This component is responsible for asynchronously writing in-memory data to disk. Each spilling
@@ -60,13 +66,17 @@ public class HsMemoryDataSpiller implements AutoCloseable {
     /** File channel to write data. */
     private final FileChannel dataFileChannel;
 
+    @Nullable private final BufferCompressor bufferCompressor;
+
     /** Records the current writing location. */
     private long totalBytesWritten;
 
-    public HsMemoryDataSpiller(Path dataFilePath) throws IOException {
+    public HsMemoryDataSpiller(Path dataFilePath, @Nullable BufferCompressor bufferCompressor)
+            throws IOException {
         this.dataFileChannel =
                 FileChannel.open(
                         dataFilePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+        this.bufferCompressor = bufferCompressor;
     }
 
     /**
@@ -88,6 +98,10 @@ public class HsMemoryDataSpiller implements AutoCloseable {
             List<BufferWithIdentity> toWrite,
             CompletableFuture<List<SpilledBuffer>> spilledFuture) {
         try {
+            toWrite =
+                    toWrite.stream()
+                            .map(this::compressBuffersIfPossible)
+                            .collect(Collectors.toList());
             List<SpilledBuffer> spilledBuffers = new ArrayList<>();
             long expectedBytes = createSpilledBuffersAndGetTotalBytes(toWrite, spilledBuffers);
             // write all buffers to file
@@ -178,4 +192,23 @@ public class HsMemoryDataSpiller implements AutoCloseable {
             ExceptionUtils.rethrow(e);
         }
     }
+
+    private BufferWithIdentity compressBuffersIfPossible(BufferWithIdentity bufferWithIdentity) {
+        Buffer buffer = bufferWithIdentity.getBuffer();
+        if (!canBeCompressed(buffer)) {
+            return bufferWithIdentity;
+        }
+
+        buffer = checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+        return new BufferWithIdentity(
+                buffer, bufferWithIdentity.getBufferIndex(), bufferWithIdentity.getChannelIndex());
+    }
+
+    /**
+     * Whether the buffer can be compressed or not. Note that event is not compressed because it is
+     * usually small and the size can become even larger after compression.
+     */
+    private boolean canBeCompressed(Buffer buffer) {
+        return bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 100df9f7cff..31e257db8f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -123,7 +123,8 @@ public class HsResultPartition extends ResultPartition {
                         bufferPool,
                         getSpillingStrategy(hybridShuffleConfiguration),
                         dataIndex,
-                        dataFilePath);
+                        dataFilePath,
+                        bufferCompressor);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index 97d16aad840..0bb65f3f15e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -201,7 +201,8 @@ class HsMemoryDataManagerTest {
                 bufferPool,
                 spillStrategy,
                 new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
-                dataFilePath);
+                dataFilePath,
+                null);
     }
 
     private HsMemoryDataManager createMemoryDataManager(
@@ -214,7 +215,8 @@ class HsMemoryDataManagerTest {
                 bufferPool,
                 spillStrategy,
                 fileDataIndex,
-                dataFilePath);
+                dataFilePath,
+                null);
     }
 
     private static ByteBuffer createRecord(int value) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
index e61465207a9..c0019a6c960 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
@@ -32,6 +33,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -58,19 +61,26 @@ class HsMemoryDataSpillerTest {
     private static final long BUFFER_WITH_HEADER_SIZE =
             BUFFER_SIZE + BufferReaderWriterUtil.HEADER_LENGTH;
 
-    private FileChannel readChannel;
-
     private HsMemoryDataSpiller memoryDataSpiller;
 
+    private @TempDir Path tempDir;
+
+    private Path dataFilePath;
+
     @BeforeEach
-    void before(@TempDir Path tempDir) throws Exception {
-        Path dataFilePath = tempDir.resolve(".data");
-        this.memoryDataSpiller = new HsMemoryDataSpiller(dataFilePath);
-        this.readChannel = FileChannel.open(dataFilePath, StandardOpenOption.READ);
+    void before() {
+        this.dataFilePath = tempDir.resolve(".data");
     }
 
-    @Test
-    void testSpillSuccessfully() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"LZ4", "LZO", "ZSTD", "NULL"})
+    void testSpillSuccessfully(String compressionFactoryName) throws Exception {
+        memoryDataSpiller =
+                createMemoryDataSpiller(
+                        dataFilePath,
+                        compressionFactoryName.equals("NULL")
+                                ? null
+                                : new BufferCompressor(BUFFER_SIZE, compressionFactoryName));
         List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();
         bufferWithIdentityList.addAll(
                 createBufferWithIdentityList(
@@ -114,7 +124,8 @@ class HsMemoryDataSpillerTest {
     }
 
     @Test
-    void testClose() {
+    void testClose() throws Exception {
+        memoryDataSpiller = createMemoryDataSpiller(dataFilePath);
         List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();
         bufferWithIdentityList.addAll(
                 createBufferWithIdentityList(
@@ -126,6 +137,7 @@ class HsMemoryDataSpillerTest {
 
     @Test
     void testRelease() throws Exception {
+        memoryDataSpiller = createMemoryDataSpiller(dataFilePath);
         List<BufferWithIdentity> bufferWithIdentityList =
                 new ArrayList<>(
                         createBufferWithIdentityList(
@@ -180,6 +192,7 @@ class HsMemoryDataSpillerTest {
     }
 
     private void checkData(List<Tuple2<Integer, Integer>> dataAndIndexes) throws Exception {
+        FileChannel readChannel = FileChannel.open(dataFilePath, StandardOpenOption.READ);
         ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
         MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
         for (Tuple2<Integer, Integer> dataAndIndex : dataAndIndexes) {
@@ -193,4 +206,13 @@ class HsMemoryDataSpillerTest {
             assertThat(buffer.getDataType().isEvent()).isEqualTo(dataAndIndex.f1 % 2 == 0);
         }
     }
+
+    private static HsMemoryDataSpiller createMemoryDataSpiller(Path dataFilePath) throws Exception {
+        return new HsMemoryDataSpiller(dataFilePath, null);
+    }
+
+    private static HsMemoryDataSpiller createMemoryDataSpiller(
+            Path dataFilePath, BufferCompressor bufferCompressor) throws Exception {
+        return new HsMemoryDataSpiller(dataFilePath, bufferCompressor);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index d25a5a34fb3..30535926437 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
@@ -37,6 +39,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -127,6 +131,27 @@ class HsSubpartitionFileReaderImplTest {
         checkData(fileReader2, 25);
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
+    void testReadBufferCompressed(String compressionFactoryName) throws Exception {
+        BufferCompressor bufferCompressor =
+                new BufferCompressor(bufferSize, compressionFactoryName);
+        BufferDecompressor bufferDecompressor =
+                new BufferDecompressor(bufferSize, compressionFactoryName);
+
+        diskIndex = new HsFileDataIndexImpl(1);
+        TestingSubpartitionViewInternalOperation viewNotifier =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReaderImpl fileReader1 = createSubpartitionFileReader(0, viewNotifier);
+
+        writeDataToFile(0, 0, 1, 3, bufferCompressor);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(3);
+
+        fileReader1.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
+        checkData(fileReader1, bufferDecompressor, 1, 2, 3);
+    }
+
     @Test
     void testReadEmptyRegion() throws Exception {
         HsSubpartitionFileReaderImpl subpartitionFileReader = createSubpartitionFileReader();
@@ -441,22 +466,29 @@ class HsSubpartitionFileReaderImplTest {
                 .isEqualTo(DataType.DATA_BUFFER);
     }
 
-    private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) {
+    private static void checkData(
+            HsSubpartitionFileReaderImpl fileReader,
+            BufferDecompressor bufferDecompressor,
+            int... expectedData) {
         assertThat(fileReader.getLoadedBuffers()).hasSameSizeAs(expectedData);
         for (int data : expectedData) {
             BufferIndexOrError bufferIndexOrError = fileReader.getLoadedBuffers().poll();
             assertThat(bufferIndexOrError).isNotNull();
-            assertThat(bufferIndexOrError.getBuffer())
-                    .hasValueSatisfying(
-                            buffer ->
-                                    assertThat(
-                                                    buffer.getNioBufferReadable()
-                                                            .order(ByteOrder.nativeOrder())
-                                                            .getInt())
-                                            .isEqualTo(data));
+            assertThat(bufferIndexOrError.getBuffer()).isPresent();
+            Buffer buffer = bufferIndexOrError.getBuffer().get();
+            buffer =
+                    buffer.isCompressed() && bufferDecompressor != null
+                            ? bufferDecompressor.decompressToIntermediateBuffer(buffer)
+                            : buffer;
+            assertThat(buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt())
+                    .isEqualTo(data);
         }
     }
 
+    private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) {
+        checkData(fileReader, null, expectedData);
+    }
+
     private HsSubpartitionFileReaderImpl createSubpartitionFileReader() {
         return createSubpartitionFileReader(targetChannel, subpartitionOperation);
     }
@@ -485,7 +517,11 @@ class HsSubpartitionFileReaderImplTest {
     }
 
     private void writeDataToFile(
-            int subpartitionId, int firstBufferIndex, int firstBufferData, int numBuffers)
+            int subpartitionId,
+            int firstBufferIndex,
+            int firstBufferData,
+            int numBuffers,
+            BufferCompressor bufferCompressor)
             throws Exception {
         List<SpilledBuffer> spilledBuffers = new ArrayList<>(numBuffers);
         ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * numBuffers];
@@ -502,11 +538,14 @@ class HsSubpartitionFileReaderImplTest {
             Buffer buffer =
                     new NetworkBuffer(
                             segment, FreeingBufferRecycler.INSTANCE, dataType, bufferSize);
+            if (bufferCompressor != null && buffer.isBuffer()) {
+                buffer = bufferCompressor.compressToOriginalBuffer(buffer);
+            }
             setBufferWithHeader(buffer, bufferWithHeaders, 2 * i);
             spilledBuffers.add(
                     new SpilledBuffer(
                             subpartitionId, firstBufferIndex + i, currentFileOffset + totalBytes));
-            totalBytes += bufferSize + BufferReaderWriterUtil.HEADER_LENGTH;
+            totalBytes += buffer.getSize() + BufferReaderWriterUtil.HEADER_LENGTH;
         }
 
         BufferReaderWriterUtil.writeBuffers(dataFileChannel, totalBytes, bufferWithHeaders);
@@ -519,6 +558,12 @@ class HsSubpartitionFileReaderImplTest {
                         diskIndex.markBufferReadable(subpartitionId, spilledBuffer.bufferIndex));
     }
 
+    private void writeDataToFile(
+            int subpartitionId, int firstBufferIndex, int firstBufferData, int numBuffers)
+            throws Exception {
+        writeDataToFile(subpartitionId, firstBufferIndex, firstBufferData, numBuffers, null);
+    }
+
     private void writeDataToFile(int subpartitionId, int firstBufferIndex, int numBuffers)
             throws Exception {
         writeDataToFile(subpartitionId, firstBufferIndex, random.nextInt(), numBuffers);