You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/07/21 03:56:51 UTC

[flink] branch master updated: [FLINK-28514][network] Remove data flush in SortMergeResultPartition

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a141441cb9 [FLINK-28514][network] Remove data flush in SortMergeResultPartition
9a141441cb9 is described below

commit 9a141441cb96837d92c4bd348b4c05617100e14b
Author: Tan Yuxin <ta...@gmail.com>
AuthorDate: Thu Jul 21 11:56:42 2022 +0800

    [FLINK-28514][network] Remove data flush in SortMergeResultPartition
    
    This closes #20294.
---
 .../partition/SortMergeResultPartition.java        | 18 +--------
 .../partition/SortMergeResultPartitionTest.java    | 45 +++-------------------
 2 files changed, 8 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 2f10b357877..26e648e3379 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -520,24 +520,10 @@ public class SortMergeResultPartition extends ResultPartition {
     }
 
     @Override
-    public void flushAll() {
-        try {
-            flushUnicastDataBuffer();
-            flushBroadcastDataBuffer();
-        } catch (IOException e) {
-            LOG.error("Failed to flush the current sort buffer.", e);
-        }
-    }
+    public void flushAll() {}
 
     @Override
-    public void flush(int subpartitionIndex) {
-        try {
-            flushUnicastDataBuffer();
-            flushBroadcastDataBuffer();
-        } catch (IOException e) {
-            LOG.error("Failed to flush the current sort buffer.", e);
-        }
-    }
+    public void flush(int subpartitionIndex) {}
 
     @Override
     public CompletableFuture<?> getAvailableFuture() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index faea851f3d7..53cb37007a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -306,39 +306,6 @@ public class SortMergeResultPartitionTest extends TestLogger {
         assertEquals(dataSize, dataRead);
     }
 
-    @Test
-    public void testFlush() throws Exception {
-        int numBuffers = useHashDataBuffer ? 100 : 15;
-        int numWriteBuffers = useHashDataBuffer ? 0 : numBuffers / 2;
-        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers);
-        SortMergeResultPartition partition = createSortMergedPartition(10, bufferPool);
-        assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
-        partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
-        partition.emitRecord(ByteBuffer.allocate(bufferSize), 1);
-        assertEquals(
-                (useHashDataBuffer ? 2 : 3) + numWriteBuffers,
-                bufferPool.bestEffortGetNumOfUsedBuffers());
-
-        partition.flush(0);
-        assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
-        partition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
-        partition.emitRecord(ByteBuffer.allocate(bufferSize), 3);
-        assertEquals(
-                (useHashDataBuffer ? 2 : 3) + numWriteBuffers,
-                bufferPool.bestEffortGetNumOfUsedBuffers());
-
-        partition.flushAll();
-        assertEquals(numWriteBuffers, bufferPool.bestEffortGetNumOfUsedBuffers());
-
-        assertNull(partition.getResultFile());
-        partition.finish();
-        assertEquals(3, partition.getResultFile().getNumRegions());
-
-        partition.close();
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testReleaseWhileWriting() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
@@ -465,16 +432,16 @@ public class SortMergeResultPartitionTest extends TestLogger {
 
         if (isBroadcast) {
             partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
-            partition.flushAll();
+            partition.finish();
 
-            assertEquals(bufferSize, partition.numBytesProduced.getCount());
-            assertEquals(numSubpartitions * bufferSize, partition.numBytesOut.getCount());
+            assertEquals(bufferSize + 4, partition.numBytesProduced.getCount());
+            assertEquals(numSubpartitions * (bufferSize + 4), partition.numBytesOut.getCount());
         } else {
             partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
-            partition.flushAll();
+            partition.finish();
 
-            assertEquals(bufferSize, partition.numBytesProduced.getCount());
-            assertEquals(bufferSize, partition.numBytesOut.getCount());
+            assertEquals(bufferSize + 4, partition.numBytesProduced.getCount());
+            assertEquals(bufferSize + numSubpartitions * 4, partition.numBytesOut.getCount());
         }
     }