You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/07/26 09:02:43 UTC

[flink] 01/02: [FLINK-28448] Fix bugs in BoundedDataTestBase when enable compression

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a4ad7814009f3e799f198d54f24a055894b1ebce
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Jul 7 23:25:45 2022 +0800

    [FLINK-28448] Fix bugs in BoundedDataTestBase when enable compression
    
    1. If enable compression, intermediateBuffer must reduce reference count after write it to file.
    2. The logic of using the compressed buffer size to calculate the amount of data is wrong.
    
    This closes #20203.
---
 .../runtime/io/network/partition/BoundedDataTestBase.java  | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
index 8fc1913668f..401ff02163f 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
@@ -190,11 +190,17 @@ public abstract class BoundedDataTestBase {
                     BufferBuilderTestUtils.buildBufferWithAscendingInts(
                             BUFFER_SIZE, numIntsInBuffer, nextValue);
             if (compressionEnabled) {
-                bd.writeBuffer(COMPRESSOR.compressToIntermediateBuffer(buffer));
+                Buffer compressedBuffer = COMPRESSOR.compressToIntermediateBuffer(buffer);
+                bd.writeBuffer(compressedBuffer);
+                // recycle intermediate buffer.
+                if (compressedBuffer != buffer) {
+                    compressedBuffer.recycleBuffer();
+                }
             } else {
                 bd.writeBuffer(buffer);
             }
             numBuffers++;
+            buffer.recycleBuffer();
         }
 
         return numBuffers;
@@ -206,13 +212,17 @@ public abstract class BoundedDataTestBase {
         int nextValue = 0;
         int numBuffers = 0;
 
+        int numIntsInBuffer;
         while ((b = reader.nextBuffer()) != null) {
-            final int numIntsInBuffer = b.getSize() / 4;
             if (compressionEnabled && b.isCompressed()) {
                 Buffer decompressedBuffer = DECOMPRESSOR.decompressToIntermediateBuffer(b);
+                numIntsInBuffer = decompressedBuffer.getSize() / 4;
                 BufferBuilderTestUtils.validateBufferWithAscendingInts(
                         decompressedBuffer, numIntsInBuffer, nextValue);
+                // recycle intermediate buffer.
+                decompressedBuffer.recycleBuffer();
             } else {
+                numIntsInBuffer = b.getSize() / 4;
                 BufferBuilderTestUtils.validateBufferWithAscendingInts(
                         b, numIntsInBuffer, nextValue);
             }