You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/02 11:17:53 UTC

[iotdb] branch 4017 created (now b0972de859)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch 4017
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b0972de859 Fix Error happened while calling onAcknowledgeDataBlockEvent

This branch includes the following new commits:

     new b0972de859 Fix Error happened while calling onAcknowledgeDataBlockEvent

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix Error happened while calling onAcknowledgeDataBlockEvent

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch 4017
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b0972de8591acb75eb18b4f50fc329dc17663c56
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Aug 2 18:04:00 2022 +0800

    Fix Error happened while calling onAcknowledgeDataBlockEvent
---
 .../iotdb/db/mpp/execution/driver/Driver.java      |  3 +-
 .../db/mpp/execution/exchange/ISinkHandle.java     |  2 +-
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 10 ++--
 .../db/mpp/execution/exchange/SinkHandle.java      | 52 +++++++++-------
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  |  2 +-
 .../execution/exchange/LocalSinkHandleTest.java    |  6 +-
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 70 +++++++++++++---------
 .../db/mpp/execution/exchange/StubSinkHandle.java  |  4 +-
 .../iotdb/db/mpp/execution/exchange/Utils.java     | 11 +++-
 9 files changed, 93 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index acf8e89470..3462b0fc7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -186,7 +185,7 @@ public abstract class Driver implements IDriver {
       if (root.hasNext()) {
         TsBlock tsBlock = root.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
-          sinkHandle.send(Collections.singletonList(tsBlock));
+          sinkHandle.send(tsBlock);
         }
       }
       return NOT_BLOCKED;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 4450ca8c21..7bc5fd1934 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -41,7 +41,7 @@ public interface ISinkHandle {
    * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
    * will be thrown if any exception happened during the data transmission.
    */
-  void send(List<TsBlock> tsBlocks);
+  void send(TsBlock tsBlocks);
 
   /**
    * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index d6960e7664..aaba4b8d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -100,8 +100,8 @@ public class LocalSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) {
-    Validate.notNull(tsBlocks, "tsBlocks is null");
+  public void send(TsBlock tsBlock) {
+    Validate.notNull(tsBlock, "tsBlocks is null");
     synchronized (this) {
       checkState();
       if (!blocked.isDone()) {
@@ -113,11 +113,9 @@ public class LocalSinkHandle implements ISinkHandle {
       if (queue.hasNoMoreTsBlocks()) {
         return;
       }
-      logger.info("send TsBlocks. Size: {}", tsBlocks.size());
+      logger.info("send TsBlocks");
       synchronized (this) {
-        for (TsBlock tsBlock : tsBlocks) {
-          blocked = queue.add(tsBlock);
-        }
+        blocked = queue.add(tsBlock);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index e8247cc621..872720fb7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.concurrent.SetThreadName;
@@ -46,9 +47,9 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SinkHandle implements ISinkHandle {
 
@@ -71,15 +72,19 @@ public class SinkHandle implements ISinkHandle {
   // Use LinkedHashMap to meet 2 needs,
   //   1. Predictable iteration order so that removing buffered tsblocks can be efficient.
   //   2. Fast lookup.
-  private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
+  private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
+      new LinkedHashMap<>();
+
+  // size for current TsBlock to reserve and free
+  private long currentTsBlockSize;
 
   private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
 
-  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private volatile ListenableFuture<Void> blocked;
   private int nextSequenceId = 0;
   /** The actual buffered memory in bytes, including the amount of memory being reserved. */
-  private long bufferRetainedSizeInBytes = 0;
+  private long bufferRetainedSizeInBytes;
 
   private boolean aborted = false;
 
@@ -109,6 +114,12 @@ public class SinkHandle implements ISinkHandle {
     this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
     this.threadName = createFullIdFrom(localFragmentInstanceId, "SinkHandle");
+    this.blocked =
+        localMemoryManager
+            .getQueryPool()
+            .reserve(localFragmentInstanceId.getQueryId(), DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
@@ -122,8 +133,8 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized void send(List<TsBlock> tsBlocks) {
-    Validate.notNull(tsBlocks, "tsBlocks is null");
+  public synchronized void send(TsBlock tsBlock) {
+    Validate.notNull(tsBlock, "tsBlocks is null");
     checkState();
     if (!blocked.isDone()) {
       throw new IllegalStateException("Sink handle is blocked.");
@@ -131,10 +142,7 @@ public class SinkHandle implements ISinkHandle {
     if (noMoreTsBlocks) {
       return;
     }
-    long retainedSizeInBytes = 0L;
-    for (TsBlock tsBlock : tsBlocks) {
-      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
-    }
+    long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
     int startSequenceId;
     List<Long> tsBlockSizes = new ArrayList<>();
     startSequenceId = nextSequenceId;
@@ -143,12 +151,13 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
     bufferRetainedSizeInBytes += retainedSizeInBytes;
-    for (TsBlock tsBlock : tsBlocks) {
-      sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
-      nextSequenceId += 1;
-    }
+
+    sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
+    nextSequenceId += 1;
+    currentTsBlockSize = retainedSizeInBytes;
+
     for (int i = startSequenceId; i < nextSequenceId; i++) {
-      tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
+      tsBlockSizes.add(sequenceIdToTsBlock.get(i).left.getRetainedSizeInBytes());
     }
 
     // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
@@ -198,6 +207,7 @@ public class SinkHandle implements ISinkHandle {
     }
     logger.info("set noMoreTsBlocks to true");
     noMoreTsBlocks = true;
+
     if (isFinished()) {
       logger.info("revoke onFinish() of sinkHandleListener");
       sinkHandleListener.onFinish(this);
@@ -263,7 +273,7 @@ public class SinkHandle implements ISinkHandle {
 
   ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
     TsBlock tsBlock;
-    tsBlock = sequenceIdToTsBlock.get(sequenceId);
+    tsBlock = sequenceIdToTsBlock.get(sequenceId).left;
     if (tsBlock == null) {
       throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
     }
@@ -276,17 +286,19 @@ public class SinkHandle implements ISinkHandle {
       if (aborted) {
         return;
       }
-      Iterator<Entry<Integer, TsBlock>> iterator = sequenceIdToTsBlock.entrySet().iterator();
+      Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
+          sequenceIdToTsBlock.entrySet().iterator();
       while (iterator.hasNext()) {
-        Entry<Integer, TsBlock> entry = iterator.next();
+        Entry<Integer, Pair<TsBlock, Long>> entry = iterator.next();
         if (entry.getKey() < startSequenceId) {
           continue;
         }
         if (entry.getKey() >= endSequenceId) {
           break;
         }
-        freedBytes += entry.getValue().getRetainedSizeInBytes();
-        bufferRetainedSizeInBytes -= entry.getValue().getRetainedSizeInBytes();
+
+        freedBytes += entry.getValue().right;
+        bufferRetainedSizeInBytes -= entry.getValue().right;
         iterator.remove();
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index e3f4d0fc66..7b24d32c13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -195,7 +195,7 @@ public class MemoryPool {
     Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
     while (iterator.hasNext()) {
       MemoryReservationFuture<Void> future = iterator.next();
-      if (future.isCancelled()) {
+      if (future.isCancelled() || future.isDone()) {
         continue;
       }
       long bytesToReserve = future.getBytes();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 422139a959..1c3cb4e659 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Collections;
-
 public class LocalSinkHandleTest {
   @Test
   public void testSend() {
@@ -68,7 +66,7 @@ public class LocalSinkHandleTest {
     // Send TsBlocks.
     int numOfSentTsblocks = 0;
     while (localSinkHandle.isFull().isDone()) {
-      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
       numOfSentTsblocks += 1;
     }
     Assert.assertEquals(6, numOfSentTsblocks);
@@ -133,7 +131,7 @@ public class LocalSinkHandleTest {
     // Send TsBlocks.
     int numOfSentTsblocks = 0;
     while (localSinkHandle.isFull().isDone()) {
-      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
       numOfSentTsblocks += 1;
     }
     Assert.assertEquals(6, numOfSentTsblocks);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index f8110fcf1d..411370c14a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -41,13 +41,15 @@ import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class SinkHandleTest {
 
   @Test
   public void testOneTimeNotBlockedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -95,18 +97,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -148,7 +152,7 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertTrue(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(mockTsBlockSize, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -173,7 +177,7 @@ public class SinkHandleTest {
   public void testMultiTimesBlockedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -223,18 +227,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -269,19 +275,21 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(2))
+    Mockito.verify(mockMemoryPool, Mockito.times(3))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -334,7 +342,8 @@ public class SinkHandleTest {
     sinkHandle.acknowledgeTsBlock(numOfMockTsBlock, numOfMockTsBlock * 2);
     Assert.assertTrue(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(2))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -344,7 +353,7 @@ public class SinkHandleTest {
   public void testFailedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -395,18 +404,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -446,7 +457,7 @@ public class SinkHandleTest {
   public void testAbort() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -456,7 +467,9 @@ public class SinkHandleTest {
     // Construct a mock LocalMemoryManager that returns blocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool spyMemoryPool =
-        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 10 * mockTsBlockSize));
+        Mockito.spy(
+            new MemoryPool(
+                "test", numOfMockTsBlock * mockTsBlockSize, numOfMockTsBlock * mockTsBlockSize));
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
 
     // Construct a mock SinkHandleListener.
@@ -496,20 +509,21 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Future<?> blocked = sinkHandle.isFull();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        2 * mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
-    Assert.assertEquals(2 * numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
 
     sinkHandle.abort();
     Assert.assertTrue(blocked.isDone());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index d68ad6726a..2930abe48b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -59,8 +59,8 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) {
-    this.tsBlocks.addAll(tsBlocks);
+  public void send(TsBlock tsBlock) {
+    this.tsBlocks.add(tsBlock);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
index 211f635636..165f0e65a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
@@ -54,6 +54,7 @@ public class Utils {
 
   public static MemoryPool createMockBlockedMemoryPool(
       String queryId, int numOfMockTsBlock, long mockTsBlockSize) {
+    long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
     MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
     AtomicReference<SettableFuture<Void>> settableFuture = new AtomicReference<>();
     settableFuture.set(SettableFuture.create());
@@ -62,8 +63,12 @@ public class Utils {
     Mockito.when(mockMemoryPool.reserve(Mockito.eq(queryId), Mockito.anyLong()))
         .thenAnswer(
             invocation -> {
-              reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
-              settableFuture.set(SettableFuture.create());
+              long bytesToReserve = invocation.getArgument(1);
+              if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
+                reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
+              } else {
+                settableFuture.set(SettableFuture.create());
+              }
               return settableFuture.get();
             });
     Mockito.doAnswer(
@@ -72,12 +77,12 @@ public class Utils {
                   reservedBytes.updateAndGet(v -> v - (long) invocation.getArgument(1));
                   if (reservedBytes.get() <= 0) {
                     settableFuture.get().set(null);
+                    reservedBytes.updateAndGet(v -> v + mockTsBlockSize);
                   }
                   return null;
                 })
         .when(mockMemoryPool)
         .free(Mockito.eq(queryId), Mockito.anyLong());
-    long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
     Mockito.when(mockMemoryPool.tryReserve(Mockito.eq(queryId), Mockito.anyLong()))
         .thenAnswer(
             invocation -> {