You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2023/07/12 03:12:37 UTC

[flink] branch master updated (e6f77bea706 -> 2dfff436c09)

This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from e6f77bea706 [FLINK-32349][table] Support atomicity for CREATE TABLE AS SELECT(CTAS) statement (#22839)
     new 61d8fec84ad [hotfix][network] Fix the availability of tiered result partition
     new 2dfff436c09 [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/network/buffer/Buffer.java    | 12 ++++++++
 .../runtime/io/network/buffer/CompositeBuffer.java |  5 ++++
 .../io/network/buffer/FileRegionBuffer.java        |  5 ++++
 .../runtime/io/network/buffer/NetworkBuffer.java   |  7 ++++-
 .../buffer/ReadOnlySlicedNetworkBuffer.java        |  5 ++++
 .../tiered/shuffle/TieredResultPartition.java      |  5 ++++
 .../tiered/storage/SortBufferAccumulator.java      | 17 +++++++----
 .../tiered/storage/TieredStorageMemoryManager.java | 11 +++++++
 .../storage/TieredStorageMemoryManagerImpl.java    | 23 ++++++++++-----
 .../storage/TieredStorageProducerClient.java       |  4 +--
 .../hybrid/tiered/tier/TierProducerAgent.java      |  4 ++-
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  |  4 +--
 .../tiered/tier/disk/DiskTierProducerAgent.java    | 13 +++++++--
 .../tier/memory/MemoryTierProducerAgent.java       |  6 +++-
 .../tier/remote/RemoteTierProducerAgent.java       |  9 +++++-
 .../hybrid/tiered/TestingTierProducerAgent.java    |  3 +-
 .../tiered/TestingTieredStorageMemoryManager.java  | 21 +++++++++++++
 .../TieredStorageMemoryManagerImplTest.java        | 34 ++++++++++++++++++++++
 .../tier/disk/DiskTierProducerAgentTest.java       | 18 ++++++++----
 .../tier/memory/MemoryTierProducerAgentTest.java   |  8 +++--
 20 files changed, 182 insertions(+), 32 deletions(-)


[flink] 01/02: [hotfix][network] Fix the availability of tiered result partition

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61d8fec84ad8d976d2186182e84548bf9cbfcd44
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Mon Jul 10 13:18:17 2023 +0800

    [hotfix][network] Fix the availability of tiered result partition
---
 .../partition/hybrid/tiered/shuffle/TieredResultPartition.java       | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
index 8758a811ef4..605cb925fff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
@@ -189,6 +189,11 @@ public class TieredResultPartition extends ResultPartition {
         }
     }
 
+    @Override
+    public CompletableFuture<?> getAvailableFuture() {
+        return AVAILABLE;
+    }
+
     @Override
     public void alignedBarrierTimeout(long checkpointId) throws IOException {
         // Nothing to do.


[flink] 02/02: [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2dfff436c09821fb658bf8d289206b9ef85bb25b
Author: Yuxin Tan <ta...@gmail.com>
AuthorDate: Mon Jul 10 11:48:44 2023 +0800

    [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers
---
 .../flink/runtime/io/network/buffer/Buffer.java    | 12 ++++++++
 .../runtime/io/network/buffer/CompositeBuffer.java |  5 ++++
 .../io/network/buffer/FileRegionBuffer.java        |  5 ++++
 .../runtime/io/network/buffer/NetworkBuffer.java   |  7 ++++-
 .../buffer/ReadOnlySlicedNetworkBuffer.java        |  5 ++++
 .../tiered/storage/SortBufferAccumulator.java      | 17 +++++++----
 .../tiered/storage/TieredStorageMemoryManager.java | 11 +++++++
 .../storage/TieredStorageMemoryManagerImpl.java    | 23 ++++++++++-----
 .../storage/TieredStorageProducerClient.java       |  4 +--
 .../hybrid/tiered/tier/TierProducerAgent.java      |  4 ++-
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  |  4 +--
 .../tiered/tier/disk/DiskTierProducerAgent.java    | 13 +++++++--
 .../tier/memory/MemoryTierProducerAgent.java       |  6 +++-
 .../tier/remote/RemoteTierProducerAgent.java       |  9 +++++-
 .../hybrid/tiered/TestingTierProducerAgent.java    |  3 +-
 .../tiered/TestingTieredStorageMemoryManager.java  | 21 +++++++++++++
 .../TieredStorageMemoryManagerImplTest.java        | 34 ++++++++++++++++++++++
 .../tier/disk/DiskTierProducerAgentTest.java       | 18 ++++++++----
 .../tier/memory/MemoryTierProducerAgentTest.java   |  8 +++--
 19 files changed, 177 insertions(+), 32 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 3a83eff42cb..4dbefc97336 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -87,6 +87,18 @@ public interface Buffer {
      */
     BufferRecycler getRecycler();
 
+    /**
+     * Sets the buffer's recycler.
+     *
+     * <p>Note that updating the recycler is an unsafe operation and this method cannot guarantee
+     * thread safety. It is important for the caller to fully understand the consequences of calling
+     * this method. Incorrectly updating the buffer recycler can result in a leak of the buffer due
+     * to using a wrong recycler to recycle buffer. Therefore, be careful when calling this method.
+     *
+     * @param bufferRecycler the new buffer recycler
+     */
+    void setRecycler(BufferRecycler bufferRecycler);
+
     /**
      * Releases this buffer once, i.e. reduces the reference count and recycles the buffer if the
      * reference count reaches <tt>0</tt>.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
index 7444704b0ad..ff6ebd1b5fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
@@ -171,6 +171,11 @@ public class CompositeBuffer implements Buffer {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void setRecycler(BufferRecycler bufferRecycler) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public boolean isRecycled() {
         throw new UnsupportedOperationException();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
index 0fbee3e6166..95949d54ed9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
@@ -178,6 +178,11 @@ public class FileRegionBuffer extends DefaultFileRegion implements Buffer {
         return null;
     }
 
+    @Override
+    public void setRecycler(BufferRecycler bufferRecycler) {
+        throw new UnsupportedOperationException("Method should never be called.");
+    }
+
     @Override
     public void recycleBuffer() {
         // nothing to do
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 4c48e12fe8a..71bae56fbbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -50,7 +50,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
     private final MemorySegment memorySegment;
 
     /** The recycler for the backing {@link MemorySegment}. */
-    private final BufferRecycler recycler;
+    private BufferRecycler recycler;
 
     /** The {@link DataType} this buffer represents. */
     private DataType dataType;
@@ -151,6 +151,11 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
         return recycler;
     }
 
+    @Override
+    public void setRecycler(BufferRecycler bufferRecycler) {
+        this.recycler = bufferRecycler;
+    }
+
     @Override
     public void recycleBuffer() {
         release();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index b40c4bea0a3..492385d0c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -120,6 +120,11 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
         return getBuffer().getRecycler();
     }
 
+    @Override
+    public void setRecycler(BufferRecycler bufferRecycler) {
+        getBuffer().setRecycler(bufferRecycler);
+    }
+
     @Override
     public void recycleBuffer() {
         getBuffer().recycleBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java
index 0a6a8bddecb..e8223d41c16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
@@ -179,8 +180,7 @@ public class SortBufferAccumulator implements BufferAccumulator {
 
     private void requestBuffers() {
         while (freeSegments.size() < numBuffers) {
-            BufferBuilder bufferBuilder = memoryManager.requestBufferBlocking(this);
-            Buffer buffer = bufferBuilder.createBufferConsumerFromBeginning().build();
+            Buffer buffer = requestBuffer();
             freeSegments.add(checkNotNull(buffer).getMemorySegment());
             if (bufferRecycler == null) {
                 bufferRecycler = buffer.getRecycler();
@@ -237,9 +237,7 @@ public class SortBufferAccumulator implements BufferAccumulator {
     private MemorySegment getFreeSegment() {
         MemorySegment freeSegment = freeSegments.poll();
         if (freeSegment == null) {
-            BufferBuilder bufferBuilder = memoryManager.requestBufferBlocking(this);
-            Buffer buffer = bufferBuilder.createBufferConsumerFromBeginning().build();
-            freeSegment = buffer.getMemorySegment();
+            freeSegment = requestBuffer().getMemorySegment();
         }
         return freeSegment;
     }
@@ -251,6 +249,15 @@ public class SortBufferAccumulator implements BufferAccumulator {
                         Collections.singletonList(bufferWithChannel.getBuffer()));
     }
 
+    private Buffer requestBuffer() {
+        BufferBuilder bufferBuilder = memoryManager.requestBufferBlocking(this);
+        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning();
+        Buffer buffer = bufferConsumer.build();
+        bufferBuilder.close();
+        bufferConsumer.close();
+        return buffer;
+    }
+
     private void releaseFreeBuffers() {
         freeSegments.forEach(this::recycleBuffer);
         freeSegments.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
index 87835c61278..f3aa2a87eb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
@@ -100,6 +101,16 @@ public interface TieredStorageMemoryManager {
      */
     int numOwnerRequestedBuffer(Object owner);
 
+    /**
+     * Notify the memory manager that transferring one buffer's ownership from the old owner to the
+     * new owner.
+     *
+     * @param oldOwner the old owner of one buffer
+     * @param newOwner the new owner of one buffer
+     * @param buffer the buffer to transfer the ownership
+     */
+    void transferBufferOwnership(Object oldOwner, Object newOwner, Buffer buffer);
+
     /**
      * Release all the resources(if exists) and check the state of the {@link
      * TieredStorageMemoryManager}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
index 46757489159..7eadf0b0908 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
@@ -83,7 +84,7 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage
      * The number of requested buffers from {@link BufferPool} for each memory owner. This field
      * should be thread-safe because it can be touched both by the task thread and the netty thread.
      */
-    private final Map<Object, AtomicInteger> numOwnerRequestedBuffers;
+    private final Map<Object, Integer> numOwnerRequestedBuffers;
 
     /**
      * This is for triggering buffer reclaiming while blocked on requesting new buffers.
@@ -196,8 +197,15 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage
 
     @Override
     public int numOwnerRequestedBuffer(Object owner) {
-        AtomicInteger numRequestedBuffer = numOwnerRequestedBuffers.get(owner);
-        return numRequestedBuffer == null ? 0 : numRequestedBuffer.get();
+        return numOwnerRequestedBuffers.getOrDefault(owner, 0);
+    }
+
+    @Override
+    public void transferBufferOwnership(Object oldOwner, Object newOwner, Buffer buffer) {
+        checkState(buffer.isBuffer(), "Only buffer supports transfer ownership.");
+        decNumRequestedBuffer(oldOwner);
+        incNumRequestedBuffer(newOwner);
+        buffer.setRecycler(memorySegment -> recycleBuffer(newOwner, memorySegment));
     }
 
     @Override
@@ -240,15 +248,14 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage
     }
 
     private void incNumRequestedBuffer(Object owner) {
-        numOwnerRequestedBuffers
-                .computeIfAbsent(owner, ignore -> new AtomicInteger(0))
-                .incrementAndGet();
+        numOwnerRequestedBuffers.compute(
+                owner, (ignore, numRequested) -> numRequested == null ? 1 : numRequested + 1);
         numRequestedBuffers.incrementAndGet();
     }
 
     private void decNumRequestedBuffer(Object owner) {
-        AtomicInteger numOwnerRequestedBuffer = numOwnerRequestedBuffers.get(owner);
-        checkNotNull(numOwnerRequestedBuffer).decrementAndGet();
+        numOwnerRequestedBuffers.compute(
+                owner, (ignore, numRequested) -> checkNotNull(numRequested) - 1);
         numRequestedBuffers.decrementAndGet();
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 10d5bf09b13..60225387b1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -181,11 +181,11 @@ public class TieredStorageProducerClient {
         }
 
         if (!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
-                subpartitionId, compressedBuffer)) {
+                subpartitionId, compressedBuffer, bufferAccumulator)) {
             chooseStorageTierToStartSegment(subpartitionId);
             checkState(
                     currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
-                            subpartitionId, compressedBuffer),
+                            subpartitionId, compressedBuffer, bufferAccumulator),
                     "Failed to write the first buffer to the new segment");
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
index 5c12f301af6..676d38e21aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java
@@ -50,11 +50,13 @@ public interface TierProducerAgent extends AutoCloseable {
      *
      * @param subpartitionId the subpartition id that the buffer is writing to
      * @param finishedBuffer the writing buffer
+     * @param bufferOwner the current owner of this writing buffer
      * @return return true if the buffer is written successfully, return false if the current
      *     segment can not store this buffer and the current segment is finished. When returning
      *     false, the agent should try start a new segment before writing the buffer.
      */
-    boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer);
+    boolean tryWrite(
+            TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner);
 
     /**
      * Close the agent.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index 1caee6f745f..f5d7f30e0c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -51,7 +51,7 @@ class DiskCacheManager {
     DiskCacheManager(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
-            TieredStorageMemoryManager storageMemoryManager,
+            TieredStorageMemoryManager memoryManager,
             PartitionFileWriter partitionFileWriter) {
         this.partitionId = partitionId;
         this.numSubpartitions = numSubpartitions;
@@ -62,7 +62,7 @@ class DiskCacheManager {
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) {
             subpartitionCacheManagers[subpartitionId] = new SubpartitionDiskCacheManager();
         }
-        storageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
+        memoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index 9df1cc2755e..21de0a8c6ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -63,6 +63,8 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
 
     private final float minReservedDiskSpaceFraction;
 
+    private final TieredStorageMemoryManager memoryManager;
+
     private final DiskCacheManager diskCacheManager;
 
     /**
@@ -89,7 +91,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
             boolean isBroadcastOnly,
             PartitionFileWriter partitionFileWriter,
             PartitionFileReader partitionFileReader,
-            TieredStorageMemoryManager storageMemoryManager,
+            TieredStorageMemoryManager memoryManager,
             TieredStorageNettyService nettyService,
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
@@ -106,6 +108,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
         this.bufferSizeBytes = bufferSizeBytes;
         this.dataFilePath = dataFilePath;
         this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
+        this.memoryManager = memoryManager;
         this.firstBufferIndexInSegment = new ArrayList<>();
         this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
 
@@ -119,7 +122,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
                 new DiskCacheManager(
                         partitionId,
                         isBroadcastOnly ? 1 : numSubpartitions,
-                        storageMemoryManager,
+                        memoryManager,
                         partitionFileWriter);
 
         this.diskIOScheduler =
@@ -156,7 +159,8 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
     }
 
     @Override
-    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) {
+    public boolean tryWrite(
+            TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner) {
         int subpartitionIndex = subpartitionId.getSubpartitionId();
         if (currentSubpartitionWriteBuffers[subpartitionIndex] != 0
                 && currentSubpartitionWriteBuffers[subpartitionIndex] + 1 > numBuffersPerSegment) {
@@ -164,6 +168,9 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
+        if (finishedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(bufferOwner, this, finishedBuffer);
+        }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
         emitBuffer(finishedBuffer, subpartitionIndex);
         return true;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index c9317222d16..c371501d417 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -109,7 +109,8 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP
     }
 
     @Override
-    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) {
+    public boolean tryWrite(
+            TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner) {
         int subpartitionIndex = subpartitionId.getSubpartitionId();
         if (currentSubpartitionWriteBuffers[subpartitionIndex] != 0
                 && currentSubpartitionWriteBuffers[subpartitionIndex] + 1 > numBuffersPerSegment) {
@@ -117,6 +118,9 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
+        if (finishedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(bufferOwner, this, finishedBuffer);
+        }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
         addFinishedBuffer(finishedBuffer, subpartitionIndex);
         return true;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 9cae191c37c..d3a7c3e2841 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -39,6 +39,8 @@ public class RemoteTierProducerAgent implements TierProducerAgent {
 
     private final RemoteCacheManager cacheDataManager;
 
+    private final TieredStorageMemoryManager memoryManager;
+
     private final int[] currentSubpartitionSegmentWriteBuffers;
 
     RemoteTierProducerAgent(
@@ -56,6 +58,7 @@ public class RemoteTierProducerAgent implements TierProducerAgent {
 
         this.numSubpartitions = numSubpartitions;
         this.numBuffersPerSegment = numBytesPerSegment / bufferSizeBytes;
+        this.memoryManager = memoryManager;
         this.cacheDataManager =
                 new RemoteCacheManager(
                         partitionId,
@@ -75,13 +78,17 @@ public class RemoteTierProducerAgent implements TierProducerAgent {
     }
 
     @Override
-    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer buffer) {
+    public boolean tryWrite(
+            TieredStorageSubpartitionId subpartitionId, Buffer buffer, Object bufferOwner) {
         int subpartitionIndex = subpartitionId.getSubpartitionId();
         if (currentSubpartitionSegmentWriteBuffers[subpartitionIndex] + 1 > numBuffersPerSegment) {
             cacheDataManager.finishSegment(subpartitionIndex);
             currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
+        if (buffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(bufferOwner, this, buffer);
+        }
         currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
         cacheDataManager.appendBuffer(buffer, subpartitionIndex);
         return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java
index 0dc623d8334..0c0b7d7c1fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java
@@ -53,7 +53,8 @@ public class TestingTierProducerAgent implements TierProducerAgent {
     }
 
     @Override
-    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) {
+    public boolean tryWrite(
+            TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner) {
         return tryWriterFunction.apply(subpartitionId, finishedBuffer);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
index 84958ab6dde..fbab5c87541 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
 
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
+import org.apache.flink.util.function.TriConsumer;
 
 import java.util.List;
 import java.util.function.BiConsumer;
@@ -41,6 +43,8 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
 
     private final Function<Object, Integer> numOwnerRequestedBufferFunction;
 
+    private final TriConsumer<Object, Object, Buffer> transferBufferOwnershipConsumer;
+
     private final Runnable releaseRunnable;
 
     private TestingTieredStorageMemoryManager(
@@ -49,12 +53,14 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
             Function<Object, BufferBuilder> requestBufferBlockingFunction,
             Function<Object, Integer> getMaxNonReclaimableBuffersFunction,
             Function<Object, Integer> numOwnerRequestedBufferFunction,
+            TriConsumer<Object, Object, Buffer> transferBufferOwnershipConsumer,
             Runnable releaseRunnable) {
         this.setupConsumer = setupConsumer;
         this.listenBufferReclaimRequestConsumer = listenBufferReclaimRequestConsumer;
         this.requestBufferBlockingFunction = requestBufferBlockingFunction;
         this.getMaxNonReclaimableBuffersFunction = getMaxNonReclaimableBuffersFunction;
         this.numOwnerRequestedBufferFunction = numOwnerRequestedBufferFunction;
+        this.transferBufferOwnershipConsumer = transferBufferOwnershipConsumer;
         this.releaseRunnable = releaseRunnable;
     }
 
@@ -83,6 +89,11 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
         return numOwnerRequestedBufferFunction.apply(owner);
     }
 
+    @Override
+    public void transferBufferOwnership(Object oldOwner, Object newOwner, Buffer buffer) {
+        transferBufferOwnershipConsumer.accept(oldOwner, newOwner, buffer);
+    }
+
     @Override
     public void release() {
         releaseRunnable.run();
@@ -102,6 +113,9 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
 
         private Function<Object, Integer> numOwnerRequestedBufferFunction = owner -> 0;
 
+        private TriConsumer<Object, Object, Buffer> transferBufferOwnershipConsumer =
+                (oldOwner, newOwner, buffer) -> {};
+
         private Runnable releaseRunnable = () -> {};
 
         public Builder() {}
@@ -136,6 +150,12 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
             return this;
         }
 
+        public TestingTieredStorageMemoryManager.Builder setTransferBufferOwnershipConsumer(
+                TriConsumer<Object, Object, Buffer> transferBufferOwnershipConsumer) {
+            this.transferBufferOwnershipConsumer = transferBufferOwnershipConsumer;
+            return this;
+        }
+
         public TestingTieredStorageMemoryManager.Builder setReleaseRunnable(
                 Runnable releaseRunnable) {
             this.releaseRunnable = releaseRunnable;
@@ -149,6 +169,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan
                     requestBufferBlockingFunction,
                     getMaxNonReclaimableBuffersFunction,
                     numOwnerRequestedBufferFunction,
+                    transferBufferOwnershipConsumer,
                     releaseRunnable);
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
index e4591bd0021..0568832c7e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
@@ -208,6 +209,39 @@ public class TieredStorageMemoryManagerImplTest {
         storageMemoryManager.release();
     }
 
+    @Test
+    void testTransferBufferOwnership() throws IOException {
+        TieredStorageMemoryManagerImpl memoryManager =
+                createStorageMemoryManager(
+                        1, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
+        BufferBuilder bufferBuilder = memoryManager.requestBufferBlocking(this);
+        assertThat(memoryManager.numOwnerRequestedBuffer(this)).isEqualTo(1);
+
+        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning();
+        Buffer buffer = bufferConsumer.build();
+        bufferBuilder.close();
+        bufferConsumer.close();
+        Object newOwner = new Object();
+        memoryManager.transferBufferOwnership(this, newOwner, buffer);
+        assertThat(memoryManager.numOwnerRequestedBuffer(this)).isEqualTo(0);
+        assertThat(memoryManager.numOwnerRequestedBuffer(newOwner)).isEqualTo(1);
+        buffer.recycleBuffer();
+        assertThat(memoryManager.numOwnerRequestedBuffer(newOwner)).isEqualTo(0);
+    }
+
+    @Test
+    void testCanNotTransferOwnershipForEvent() throws IOException {
+        TieredStorageMemoryManagerImpl memoryManager =
+                createStorageMemoryManager(
+                        1, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
+        BufferConsumer bufferConsumer =
+                BufferBuilderTestUtils.createEventBufferConsumer(1, Buffer.DataType.EVENT_BUFFER);
+        Buffer buffer = bufferConsumer.build();
+        bufferConsumer.close();
+        assertThatThrownBy(() -> memoryManager.transferBufferOwnership(this, new Object(), buffer))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
     @Test
     void testReleaseBeforeRecyclingBuffers() throws IOException {
         int numBuffers = 5;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 636f8d0095b..3daf34c4a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -107,7 +107,9 @@ public class DiskTierProducerAgentTest {
             diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0);
             assertThat(
                             diskTierProducerAgent.tryWrite(
-                                    SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer()))
+                                    SUBPARTITION_ID,
+                                    BufferBuilderTestUtils.buildSomeBuffer(),
+                                    this))
                     .isTrue();
         }
     }
@@ -125,12 +127,15 @@ public class DiskTierProducerAgentTest {
             diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0);
             assertThat(
                             diskTierProducerAgent.tryWrite(
-                                    SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer()))
+                                    SUBPARTITION_ID,
+                                    BufferBuilderTestUtils.buildSomeBuffer(),
+                                    this))
                     .isTrue();
             assertThat(
                             diskTierProducerAgent.tryWrite(
                                     SUBPARTITION_ID,
-                                    BufferBuilderTestUtils.buildSomeBuffer(BUFFER_SIZE_BYTES)))
+                                    BufferBuilderTestUtils.buildSomeBuffer(BUFFER_SIZE_BYTES),
+                                    this))
                     .isFalse();
         }
     }
@@ -150,11 +155,14 @@ public class DiskTierProducerAgentTest {
                             () ->
                                     diskTierProducerAgent.tryWrite(
                                             new TieredStorageSubpartitionId(1),
-                                            BufferBuilderTestUtils.buildSomeBuffer()))
+                                            BufferBuilderTestUtils.buildSomeBuffer(),
+                                            this))
                     .isInstanceOf(ArrayIndexOutOfBoundsException.class);
             assertThat(
                             diskTierProducerAgent.tryWrite(
-                                    SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer()))
+                                    SUBPARTITION_ID,
+                                    BufferBuilderTestUtils.buildSomeBuffer(),
+                                    this))
                     .isTrue();
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
index dd6c24addec..09e4e8c5735 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
@@ -97,11 +97,15 @@ class MemoryTierProducerAgentTest {
                     SUBPARTITION_ID, new TestingNettyConnectionWriter.Builder().build());
             assertThat(
                             memoryTierProducerAgent.tryWrite(
-                                    SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer()))
+                                    SUBPARTITION_ID,
+                                    BufferBuilderTestUtils.buildSomeBuffer(),
+                                    this))
                     .isTrue();
             assertThat(
                             memoryTierProducerAgent.tryWrite(
-                                    SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer()))
+                                    SUBPARTITION_ID,
+                                    BufferBuilderTestUtils.buildSomeBuffer(),
+                                    this))
                     .isFalse();
         }
     }