You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/07/21 03:56:51 UTC
[flink] branch master updated: [FLINK-28514][network] Remove data flush in SortMergeResultPartition
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9a141441cb9 [FLINK-28514][network] Remove data flush in SortMergeResultPartition
9a141441cb9 is described below
commit 9a141441cb96837d92c4bd348b4c05617100e14b
Author: Tan Yuxin <ta...@gmail.com>
AuthorDate: Thu Jul 21 11:56:42 2022 +0800
[FLINK-28514][network] Remove data flush in SortMergeResultPartition
This closes #20294.
---
.../partition/SortMergeResultPartition.java | 18 +--------
.../partition/SortMergeResultPartitionTest.java | 45 +++-------------------
2 files changed, 8 insertions(+), 55 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 2f10b357877..26e648e3379 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -520,24 +520,10 @@ public class SortMergeResultPartition extends ResultPartition {
}
@Override
- public void flushAll() {
- try {
- flushUnicastDataBuffer();
- flushBroadcastDataBuffer();
- } catch (IOException e) {
- LOG.error("Failed to flush the current sort buffer.", e);
- }
- }
+ public void flushAll() {}
@Override
- public void flush(int subpartitionIndex) {
- try {
- flushUnicastDataBuffer();
- flushBroadcastDataBuffer();
- } catch (IOException e) {
- LOG.error("Failed to flush the current sort buffer.", e);
- }
- }
+ public void flush(int subpartitionIndex) {}
@Override
public CompletableFuture<?> getAvailableFuture() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index faea851f3d7..53cb37007a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -306,39 +306,6 @@ public class SortMergeResultPartitionTest extends TestLogger {
assertEquals(dataSize, dataRead);
}
- @Test
- public void testFlush() throws Exception {
- int numBuffers = useHashDataBuffer ? 100 : 15;
- int numWriteBuffers = useHashDataBuffer ? 0 : numBuffers / 2;
- BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
- SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool);
- assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
- partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
- partition.emitRecord(ByteBuffer.allocate(bufferSize), 1);
- assertEquals(
- (useHashDataBuffer ? 2 : 3) + numWriteBuffers,
- bufferPool.bestEffortGetNumOfUsedBuffers());
-
- partition.flush(0);
- assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
- partition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
- partition.emitRecord(ByteBuffer.allocate(bufferSize), 3);
- assertEquals(
- (useHashDataBuffer ? 2 : 3) + numWriteBuffers,
- bufferPool.bestEffortGetNumOfUsedBuffers());
-
- partition.flushAll();
- assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
- assertNull(partition.getResultFile());
- partition.finish();
- assertEquals(3, partition.getResultFile().getNumRegions());
-
- partition.close();
- }
-
@Test(expected = IllegalStateException.class)
public void testReleaseWhileWriting() throws Exception {
int numBuffers = useHashDataBuffer ? 100 : 15;
@@ -465,16 +432,16 @@ public class SortMergeResultPartitionTest extends TestLogger {
if (isBroadcast) {
partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
- partition.flushAll();
+ partition.finish();
- assertEquals(bufferSize, partition.numBytesProduced.getCount());
- assertEquals(numSubpartitions * bufferSize, partition.numBytesOut.getCount());
+ assertEquals(bufferSize + 4, partition.numBytesProduced.getCount());
+ assertEquals(numSubpartitions * (bufferSize + 4), partition.numBytesOut.getCount());
} else {
partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
- partition.flushAll();
+ partition.finish();
- assertEquals(bufferSize, partition.numBytesProduced.getCount());
- assertEquals(bufferSize, partition.numBytesOut.getCount());
+ assertEquals(bufferSize + 4, partition.numBytesProduced.getCount());
+ assertEquals(bufferSize + numSubpartitions * 4, partition.numBytesOut.getCount());
}
}