You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/02 03:14:20 UTC

[iotdb] branch master updated: [IOTDB-2972] implement local sink/source handle (#5732)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a6031a3c1 [IOTDB-2972] implement local sink/source handle (#5732)
8a6031a3c1 is described below

commit 8a6031a3c187953f3989e7604a252ef2ad59e4f0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Mon May 2 11:14:15 2022 +0800

    [IOTDB-2972] implement local sink/source handle (#5732)
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 106 ++++++++++++--
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |  24 +++-
 .../iotdb/db/mpp/buffer/LocalSinkHandle.java       | 146 +++++++++++++++++++
 .../iotdb/db/mpp/buffer/LocalSourceHandle.java     | 125 ++++++++++++++++
 .../iotdb/db/mpp/buffer/SharedTsBlockQueue.java    | 152 ++++++++++++++++++++
 .../iotdb/db/mpp/buffer/DataBlockManagerTest.java  | 121 ++++++++++++++++
 .../iotdb/db/mpp/buffer/LocalSinkHandleTest.java   | 151 +++++++++++++++++++
 .../iotdb/db/mpp/buffer/LocalSourceHandleTest.java | 119 +++++++++++++++
 .../db/mpp/buffer/SharedTsBlockQueueTest.java      | 160 +++++++++++++++++++++
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |   2 +-
 .../java/org/apache/iotdb/db/mpp/buffer/Utils.java |   6 +
 11 files changed, 1094 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 20b7be00bd..debf576e40 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -86,7 +86,7 @@ public class DataBlockManager implements IDataBlockManager {
                 + ".");
       }
       TGetDataBlockResponse resp = new TGetDataBlockResponse();
-      SinkHandle sinkHandle = sinkHandles.get(req.getSourceFragmentInstanceId());
+      SinkHandle sinkHandle = (SinkHandle) sinkHandles.get(req.getSourceFragmentInstanceId());
       for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
         try {
           ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
@@ -111,8 +111,7 @@ public class DataBlockManager implements IDataBlockManager {
                 + e.getSourceFragmentInstanceId()
                 + ".");
       }
-      sinkHandles
-          .get(e.getSourceFragmentInstanceId())
+      ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
           .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
     }
 
@@ -138,7 +137,8 @@ public class DataBlockManager implements IDataBlockManager {
       }
 
       SourceHandle sourceHandle =
-          sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
+          (SourceHandle)
+              sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
       sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
     }
 
@@ -163,9 +163,10 @@ public class DataBlockManager implements IDataBlockManager {
                 + ".");
       }
       SourceHandle sourceHandle =
-          sourceHandles
-              .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
-              .get(e.getTargetPlanNodeId());
+          (SourceHandle)
+              sourceHandles
+                  .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
+                  .get(e.getTargetPlanNodeId());
       sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
     }
   }
@@ -263,8 +264,8 @@ public class DataBlockManager implements IDataBlockManager {
   private final ExecutorService executorService;
   private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
       dataBlockServiceClientManager;
-  private final Map<TFragmentInstanceId, Map<String, SourceHandle>> sourceHandles;
-  private final Map<TFragmentInstanceId, SinkHandle> sinkHandles;
+  private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
+  private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
 
   private DataBlockServiceImpl dataBlockService;
 
@@ -288,6 +289,47 @@ public class DataBlockManager implements IDataBlockManager {
     return dataBlockService;
   }
 
+  @Override
+  public synchronized ISinkHandle createLocalSinkHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      // TODO: replace with callbacks to decouple DataBlockManager from FragmentInstanceContext
+      FragmentInstanceContext instanceContext) {
+    if (sinkHandles.containsKey(localFragmentInstanceId)) {
+      throw new IllegalStateException(
+          "Local sink handle for " + localFragmentInstanceId + " exists.");
+    }
+
+    logger.debug(
+        "Create local sink handle to plan node {} of {} for {}",
+        remotePlanNodeId,
+        remoteFragmentInstanceId,
+        localFragmentInstanceId);
+
+    SharedTsBlockQueue queue;
+    if (sourceHandles.containsKey(remoteFragmentInstanceId)
+        && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+      logger.debug("Get shared tsblock queue from local source handle");
+      queue =
+          ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
+              .getSharedTsBlockQueue();
+    } else {
+      logger.debug("Create shared tsblock queue");
+      queue = new SharedTsBlockQueue(remoteFragmentInstanceId, localMemoryManager);
+    }
+
+    LocalSinkHandle localSinkHandle =
+        new LocalSinkHandle(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            localFragmentInstanceId,
+            queue,
+            new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
+    sinkHandles.put(localFragmentInstanceId, localSinkHandle);
+    return localSinkHandle;
+  }
+
   @Override
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
@@ -321,6 +363,48 @@ public class DataBlockManager implements IDataBlockManager {
     return sinkHandle;
   }
 
+  @Override
+  public synchronized ISourceHandle createLocalSourceHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IDataBlockManagerCallback<Throwable> onFailureCallback) {
+    if (sourceHandles.containsKey(localFragmentInstanceId)
+        && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+      throw new IllegalStateException(
+          "Source handle for plan node "
+              + localPlanNodeId
+              + " of "
+              + localFragmentInstanceId
+              + " exists.");
+    }
+
+    logger.debug(
+        "Create local source handle from {} for plan node {} of {}",
+        remoteFragmentInstanceId,
+        localPlanNodeId,
+        localFragmentInstanceId);
+    SharedTsBlockQueue queue;
+    if (sinkHandles.containsKey(remoteFragmentInstanceId)) {
+      logger.debug("Get shared tsblock queue from local sink handle");
+      queue = ((LocalSinkHandle) sinkHandles.get(remoteFragmentInstanceId)).getSharedTsBlockQueue();
+    } else {
+      logger.debug("Create shared tsblock queue");
+      queue = new SharedTsBlockQueue(localFragmentInstanceId, localMemoryManager);
+    }
+    LocalSourceHandle localSourceHandle =
+        new LocalSourceHandle(
+            remoteFragmentInstanceId,
+            localFragmentInstanceId,
+            localPlanNodeId,
+            queue,
+            new SourceHandleListenerImpl(onFailureCallback));
+    sourceHandles
+        .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
+        .put(localPlanNodeId, localSourceHandle);
+    return localSourceHandle;
+  }
+
   @Override
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
@@ -376,8 +460,8 @@ public class DataBlockManager implements IDataBlockManager {
       sinkHandles.remove(fragmentInstanceId);
     }
     if (sourceHandles.containsKey(fragmentInstanceId)) {
-      Map<String, SourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
-      for (Entry<String, SourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
+      Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
+      for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
         logger.info("Close source handle {}", sourceHandles);
         entry.getValue().abort();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index 049f8b023d..7eceb835eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -30,14 +30,20 @@ public interface IDataBlockManager {
    *
    * @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
    *     blocks to the sink handle.
-   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
-   *     be sent to.
+   * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+   *     should be sent to.
    * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
    * @param instanceContext The context of local fragment instance.
    */
   ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      TEndPoint endpoint,
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      FragmentInstanceContext instanceContext);
+
+  ISinkHandle createLocalSinkHandle(
+      TFragmentInstanceId localFragmentInstanceId,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId,
       FragmentInstanceContext instanceContext);
@@ -49,15 +55,21 @@ public interface IDataBlockManager {
    * @param localFragmentInstanceId ID of the local fragment instance who receives data blocks from
    *     the source handle.
    * @param localPlanNodeId The local sink plan node ID.
-   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
-   *     be received from.
+   * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+   *     should be received from.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
    * @param onFailureCallback The callback on failure.
    */
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      TEndPoint endpoint,
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IDataBlockManagerCallback<Throwable> onFailureCallback);
+
+  ISourceHandle createLocalSourceHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
       TFragmentInstanceId remoteFragmentInstanceId,
       IDataBlockManagerCallback<Throwable> onFailureCallback);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java
new file mode 100644
index 0000000000..f3be21c6b6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class LocalSinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(LocalSinkHandle.class);
+
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remotePlanNodeId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final SinkHandleListener sinkHandleListener;
+
+  private final SharedTsBlockQueue queue;
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private boolean aborted = false;
+
+  public LocalSinkHandle(
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      TFragmentInstanceId localFragmentInstanceId,
+      SharedTsBlockQueue queue,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+    this.queue = Validate.notNull(queue);
+  }
+
+  @Override
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    return localFragmentInstanceId;
+  }
+
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    return queue.getBufferRetainedSizeInBytes();
+  }
+
+  @Override
+  public synchronized ListenableFuture<Void> isFull() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return aborted;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+  }
+
+  @Override
+  public synchronized void send(List<TsBlock> tsBlocks) {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (queue.hasNoMoreTsBlocks()) {
+      return;
+    }
+    for (TsBlock tsBlock : tsBlocks) {
+      blocked = queue.add(tsBlock);
+    }
+  }
+
+  @Override
+  public synchronized void send(int partition, List<TsBlock> tsBlocks) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized void setNoMoreTsBlocks() {
+    logger.info("Set no-more-tsblocks to {}.", this);
+    if (aborted) {
+      return;
+    }
+    queue.setNoMoreTsBlocks(true);
+    sinkHandleListener.onEndOfBlocks(this);
+    if (isFinished()) {
+      sinkHandleListener.onFinish(this);
+    }
+    logger.info("No-more-tsblocks has been set to {}.", this);
+  }
+
+  @Override
+  public synchronized void abort() {
+    logger.info("Sink handle {} is being aborted.", this);
+    aborted = true;
+    queue.destroy();
+    sinkHandleListener.onAborted(this);
+    logger.info("Sink handle {} is aborted", this);
+  }
+
+  public TFragmentInstanceId getRemoteFragmentInstanceId() {
+    return remoteFragmentInstanceId;
+  }
+
+  public String getRemotePlanNodeId() {
+    return remotePlanNodeId;
+  }
+
+  SharedTsBlockQueue getSharedTsBlockQueue() {
+    return queue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java
new file mode 100644
index 0000000000..fc50072711
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class LocalSourceHandle implements ISourceHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(LocalSourceHandle.class);
+
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final String localPlanNodeId;
+  private final SourceHandleListener sourceHandleListener;
+  private final SharedTsBlockQueue queue;
+  private boolean aborted = false;
+
+  public LocalSourceHandle(
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      SharedTsBlockQueue queue,
+      SourceHandleListener sourceHandleListener) {
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+    this.queue = Validate.notNull(queue);
+    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+  }
+
+  @Override
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    return localFragmentInstanceId;
+  }
+
+  @Override
+  public String getLocalPlanNodeId() {
+    return localPlanNodeId;
+  }
+
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    return queue.getBufferRetainedSizeInBytes();
+  }
+
+  @Override
+  public TsBlock receive() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
+    }
+    if (!queue.isBlocked().isDone()) {
+      throw new IllegalStateException("Source handle is blocked.");
+    }
+    TsBlock tsBlock;
+    synchronized (this) {
+      tsBlock = queue.remove();
+    }
+    if (isFinished()) {
+      sourceHandleListener.onFinished(this);
+    }
+    return tsBlock;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is closed.");
+    }
+    return nonCancellationPropagating(queue.isBlocked());
+  }
+
+  @Override
+  public boolean isAborted() {
+    return aborted;
+  }
+
+  @Override
+  public synchronized void abort() {
+    if (aborted) {
+      return;
+    }
+    queue.destroy();
+    aborted = true;
+    sourceHandleListener.onAborted(this);
+  }
+
+  public TFragmentInstanceId getRemoteFragmentInstanceId() {
+    return remoteFragmentInstanceId;
+  }
+
+  SharedTsBlockQueue getSharedTsBlockQueue() {
+    return queue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java
new file mode 100644
index 0000000000..46d2606346
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.lang3.Validate;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class SharedTsBlockQueue {
+
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+
+  @GuardedBy("this")
+  private boolean noMoreTsBlocks = false;
+
+  @GuardedBy("this")
+  private long bufferRetainedSizeInBytes = 0L;
+
+  @GuardedBy("this")
+  private final Queue<TsBlock> queue = new LinkedList<>();
+
+  @GuardedBy("this")
+  private SettableFuture<Void> blocked = SettableFuture.create();
+
+  @GuardedBy("this")
+  private ListenableFuture<Void> blockedOnMemory;
+
+  @GuardedBy("this")
+  private boolean destroyed = false;
+
+  public SharedTsBlockQueue(
+      TFragmentInstanceId fragmentInstanceId, LocalMemoryManager localMemoryManager) {
+    this.localFragmentInstanceId =
+        Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+    this.localMemoryManager =
+        Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+  }
+
+  public boolean hasNoMoreTsBlocks() {
+    return noMoreTsBlocks;
+  }
+
+  public long getBufferRetainedSizeInBytes() {
+    return bufferRetainedSizeInBytes;
+  }
+
+  public ListenableFuture<Void> isBlocked() {
+    return blocked;
+  }
+
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  /** Notify no more tsblocks will be added to the queue. */
+  public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+    if (destroyed) {
+      throw new IllegalStateException("queue has been destroyed");
+    }
+    this.noMoreTsBlocks = noMoreTsBlocks;
+  }
+
+  /**
+   * Remove a tsblock from the head of the queue and return. Should be invoked only when the future
+   * returned by {@link #isBlocked()} completes.
+   */
+  public synchronized TsBlock remove() {
+    if (destroyed) {
+      throw new IllegalStateException("queue has been destroyed");
+    }
+    TsBlock tsBlock = queue.remove();
+    localMemoryManager
+        .getQueryPool()
+        .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+    bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+    if (blocked.isDone() && queue.isEmpty()) {
+      blocked = SettableFuture.create();
+    }
+    return tsBlock;
+  }
+
+  /**
+   * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
+   * the returned future of last invocation completes.
+   */
+  public synchronized ListenableFuture<Void> add(TsBlock tsBlock) {
+    if (destroyed) {
+      throw new IllegalStateException("queue has been destroyed");
+    }
+
+    Validate.notNull(tsBlock, "tsblock cannot be null");
+    Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+    blockedOnMemory =
+        localMemoryManager
+            .getQueryPool()
+            .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+    bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    queue.add(tsBlock);
+    if (!blocked.isDone()) {
+      blocked.set(null);
+    }
+    return blockedOnMemory;
+  }
+
+  /** Destroy the queue and cancel the future. */
+  public synchronized void destroy() {
+    if (destroyed) {
+      return;
+    }
+    destroyed = true;
+    if (!blocked.isDone()) {
+      blocked.set(null);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java
new file mode 100644
index 0000000000..a57fef344a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executors;
+
+public class DataBlockManagerTest {
+  @Test
+  public void testCreateLocalSinkHandle() {
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
+    final String remotePlanNodeId = "exchange_0";
+    final FragmentInstanceContext mockFragmentInstanceContext =
+        Mockito.mock(FragmentInstanceContext.class);
+
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10240L, 5120L));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+
+    DataBlockManager dataBlockManager =
+        new DataBlockManager(
+            mockLocalMemoryManager,
+            new TsBlockSerdeFactory(),
+            Executors.newSingleThreadExecutor(),
+            new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+                .createClientManager(
+                    new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
+
+    ISinkHandle localSinkHandle =
+        dataBlockManager.createLocalSinkHandle(
+            localFragmentInstanceId,
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            mockFragmentInstanceContext);
+
+    Assert.assertTrue(localSinkHandle instanceof LocalSinkHandle);
+
+    ISourceHandle localSourceHandle =
+        dataBlockManager.createLocalSourceHandle(
+            remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, t -> {});
+
+    Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
+
+    Assert.assertEquals(
+        ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+        ((LocalSourceHandle) localSourceHandle).getSharedTsBlockQueue());
+  }
+
+  @Test
+  public void testCreateLocalSourceHandle() {
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
+    final String localPlanNodeId = "exchange_0";
+    final FragmentInstanceContext mockFragmentInstanceContext =
+        Mockito.mock(FragmentInstanceContext.class);
+
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10240L, 5120L));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+
+    DataBlockManager dataBlockManager =
+        new DataBlockManager(
+            mockLocalMemoryManager,
+            new TsBlockSerdeFactory(),
+            Executors.newSingleThreadExecutor(),
+            new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+                .createClientManager(
+                    new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
+
+    ISourceHandle localSourceHandle =
+        dataBlockManager.createLocalSourceHandle(
+            localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, t -> {});
+
+    Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
+
+    ISinkHandle localSinkHandle =
+        dataBlockManager.createLocalSinkHandle(
+            remoteFragmentInstanceId,
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockFragmentInstanceContext);
+
+    Assert.assertTrue(localSinkHandle instanceof LocalSinkHandle);
+
+    Assert.assertEquals(
+        ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+        ((LocalSourceHandle) localSourceHandle).getSharedTsBlockQueue());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java
new file mode 100644
index 0000000000..ce48ed1667
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class LocalSinkHandleTest {
+  @Test
+  public void testSend() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+    final String remotePlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool spyMemoryPool =
+        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+    // Construct a mock SinkHandleListener.
+    SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+    // Construct a shared tsblock queue.
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
+
+    // Construct SinkHandle.
+    LocalSinkHandle localSinkHandle =
+        new LocalSinkHandle(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            localFragmentInstanceId,
+            queue,
+            mockSinkHandleListener);
+    Assert.assertTrue(localSinkHandle.isFull().isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertFalse(localSinkHandle.isAborted());
+    Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+
+    // Send tsblocks.
+    int numOfSentTsblocks = 0;
+    while (localSinkHandle.isFull().isDone()) {
+      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      numOfSentTsblocks += 1;
+    }
+    Assert.assertEquals(6, numOfSentTsblocks);
+    Assert.assertFalse(localSinkHandle.isFull().isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertEquals(6 * mockTsBlockSize, localSinkHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(spyMemoryPool, Mockito.times(6)).reserve(queryId, mockTsBlockSize);
+
+    // Receive tsblocks.
+    int numOfReceivedTsblocks = 0;
+    while (!queue.isEmpty()) {
+      queue.remove();
+      numOfReceivedTsblocks += 1;
+    }
+    Assert.assertEquals(6, numOfReceivedTsblocks);
+    Assert.assertTrue(localSinkHandle.isFull().isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(spyMemoryPool, Mockito.times(6)).free(queryId, mockTsBlockSize);
+
+    // Set no-more-tsblocks.
+    localSinkHandle.setNoMoreTsBlocks();
+    Assert.assertTrue(localSinkHandle.isFull().isDone());
+    Assert.assertTrue(localSinkHandle.isFinished());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(localSinkHandle);
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(localSinkHandle);
+  }
+
+  @Test
+  public void testAbort() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+    final String remotePlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool spyMemoryPool =
+        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+    // Construct a mock SinkHandleListener.
+    SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+    // Construct a shared tsblock queue.
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
+
+    // Construct SinkHandle.
+    LocalSinkHandle localSinkHandle =
+        new LocalSinkHandle(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            localFragmentInstanceId,
+            queue,
+            mockSinkHandleListener);
+    Assert.assertTrue(localSinkHandle.isFull().isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertFalse(localSinkHandle.isAborted());
+    Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+
+    // Send tsblocks.
+    int numOfSentTsblocks = 0;
+    while (localSinkHandle.isFull().isDone()) {
+      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      numOfSentTsblocks += 1;
+    }
+    Assert.assertEquals(6, numOfSentTsblocks);
+    ListenableFuture<Void> blocked = localSinkHandle.isFull();
+    Assert.assertFalse(blocked.isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertEquals(6 * mockTsBlockSize, localSinkHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(spyMemoryPool, Mockito.times(6)).reserve(queryId, mockTsBlockSize);
+
+    // Abort.
+    localSinkHandle.abort();
+    Assert.assertTrue(blocked.isDone());
+    Assert.assertFalse(localSinkHandle.isFinished());
+    Assert.assertTrue(localSinkHandle.isAborted());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(localSinkHandle);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java
new file mode 100644
index 0000000000..9c9575d9ca
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocalSourceHandleTest {
+  @Test
+  public void testReceive() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+    final String localPlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+    // Construct a mock LocalMemoryManager that do not block any reservation.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    // Construct a mock SourceHandleListener.
+    SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+    // Construct a shared tsblock queue.
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(localFragmentInstanceId, mockLocalMemoryManager);
+
+    LocalSourceHandle localSourceHandle =
+        new LocalSourceHandle(
+            remoteFragmentInstanceId,
+            localFragmentInstanceId,
+            localPlanNodeId,
+            queue,
+            mockSourceHandleListener);
+    Assert.assertFalse(localSourceHandle.isBlocked().isDone());
+    Assert.assertFalse(localSourceHandle.isAborted());
+    Assert.assertFalse(localSourceHandle.isFinished());
+    Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
+
+    // Local sink handle produces tsblocks.
+    queue.add(Utils.createMockTsBlock(mockTsBlockSize));
+    queue.setNoMoreTsBlocks(true);
+    Assert.assertTrue(localSourceHandle.isBlocked().isDone());
+    Assert.assertFalse(localSourceHandle.isAborted());
+    Assert.assertFalse(localSourceHandle.isFinished());
+    Assert.assertEquals(mockTsBlockSize, localSourceHandle.getBufferRetainedSizeInBytes());
+
+    // Consume tsblocks.
+    Assert.assertTrue(localSourceHandle.isBlocked().isDone());
+    localSourceHandle.receive();
+    ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
+    Assert.assertFalse(blocked.isDone());
+    Assert.assertFalse(localSourceHandle.isAborted());
+    Assert.assertTrue(localSourceHandle.isFinished());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(localSourceHandle);
+  }
+
+  @Test
+  public void testAbort() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+    final String localPlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+    // Construct a mock LocalMemoryManager that do not block any reservation.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    // Construct a mock SourceHandleListener.
+    SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+    // Construct a shared tsblock queue.
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(localFragmentInstanceId, mockLocalMemoryManager);
+
+    LocalSourceHandle localSourceHandle =
+        new LocalSourceHandle(
+            remoteFragmentInstanceId,
+            localFragmentInstanceId,
+            localPlanNodeId,
+            queue,
+            mockSourceHandleListener);
+    ListenableFuture<Void> future = localSourceHandle.isBlocked();
+    Assert.assertFalse(future.isDone());
+    Assert.assertFalse(localSourceHandle.isAborted());
+    Assert.assertFalse(localSourceHandle.isFinished());
+    Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
+
+    // Close the local source handle.
+    localSourceHandle.abort();
+    Assert.assertTrue(future.isDone());
+    Assert.assertTrue(localSourceHandle.isAborted());
+    Assert.assertFalse(localSourceHandle.isFinished());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(localSourceHandle);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java
new file mode 100644
index 0000000000..5b79f10c41
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SharedTsBlockQueueTest {
+  @Test(timeout = 5000L)
+  public void concurrencyTest() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool spyMemoryPool =
+        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(new TFragmentInstanceId(queryId, 0, "0"), mockLocalMemoryManager);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    AtomicReference<Integer> numOfTimesSenderBlocked = new AtomicReference<>(0);
+    AtomicReference<Integer> numOfTimesReceiverBlocked = new AtomicReference<>(0);
+    AtomicReference<Integer> numOfTsBlocksToSend = new AtomicReference<>(1000);
+    AtomicReference<Integer> numOfTsBlocksToReceive = new AtomicReference<>(1000);
+    executor.submit(
+        new SendTask(
+            queue, mockTsBlockSize, numOfTsBlocksToSend, numOfTimesSenderBlocked, executor));
+    executor.submit(
+        new ReceiveTask(queue, numOfTsBlocksToReceive, numOfTimesReceiverBlocked, executor));
+
+    while (numOfTsBlocksToSend.get() != 0 && numOfTsBlocksToReceive.get() != 0) {
+      String message =
+          String.format(
+              "Sender %d: %d, Receiver %d: %d",
+              numOfTimesSenderBlocked.get(),
+              numOfTsBlocksToSend.get(),
+              numOfTimesReceiverBlocked.get(),
+              numOfTsBlocksToReceive.get());
+      System.out.println(message);
+      try {
+        Thread.sleep(10L);
+      } catch (InterruptedException e) {
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+  private static class SendTask implements Runnable {
+
+    private final SharedTsBlockQueue queue;
+    private final long mockTsBlockSize;
+    private final AtomicReference<Integer> numOfTsBlocksToSend;
+    private final AtomicReference<Integer> numOfTimesBlocked;
+    private final ExecutorService executor;
+
+    public SendTask(
+        SharedTsBlockQueue queue,
+        long mockTsBlockSize,
+        AtomicReference<Integer> numOfTsBlocksToSend,
+        AtomicReference<Integer> numOfTimesBlocked,
+        ExecutorService executor) {
+      this.queue = Validate.notNull(queue);
+      Validate.isTrue(mockTsBlockSize > 0L);
+      this.mockTsBlockSize = mockTsBlockSize;
+      this.numOfTsBlocksToSend = Validate.notNull(numOfTsBlocksToSend);
+      this.numOfTimesBlocked = Validate.notNull(numOfTimesBlocked);
+      this.executor = Validate.notNull(executor);
+    }
+
+    @Override
+    public void run() {
+      ListenableFuture<Void> blockedOnMemory = null;
+      while (numOfTsBlocksToSend.get() > 0) {
+        blockedOnMemory = queue.add(Utils.createMockTsBlock(mockTsBlockSize));
+        numOfTsBlocksToSend.updateAndGet(v -> v - 1);
+        if (!blockedOnMemory.isDone()) {
+          break;
+        }
+      }
+
+      if (blockedOnMemory != null) {
+        numOfTimesBlocked.updateAndGet(v -> v + 1);
+        blockedOnMemory.addListener(
+            new SendTask(queue, mockTsBlockSize, numOfTsBlocksToSend, numOfTimesBlocked, executor),
+            executor);
+      } else {
+        queue.setNoMoreTsBlocks(true);
+      }
+    }
+  }
+
+  private static class ReceiveTask implements Runnable {
+
+    private final SharedTsBlockQueue queue;
+    private final AtomicReference<Integer> numOfTsBlocksToReceive;
+    private final AtomicReference<Integer> numOfTimesBlocked;
+    private final ExecutorService executor;
+
+    public ReceiveTask(
+        SharedTsBlockQueue queue,
+        AtomicReference<Integer> numOfTsBlocksToReceive,
+        AtomicReference<Integer> numOfTimesBlocked,
+        ExecutorService executor) {
+      this.queue = Validate.notNull(queue);
+      this.numOfTsBlocksToReceive = Validate.notNull(numOfTsBlocksToReceive);
+      this.numOfTimesBlocked = Validate.notNull(numOfTimesBlocked);
+      this.executor = Validate.notNull(executor);
+    }
+
+    @Override
+    public void run() {
+      ListenableFuture<Void> blocked = null;
+      while (numOfTsBlocksToReceive.get() > 0) {
+        blocked = queue.isBlocked();
+        if (blocked.isDone()) {
+          queue.remove();
+          numOfTsBlocksToReceive.updateAndGet(v -> v - 1);
+        } else {
+          break;
+        }
+      }
+
+      if (blocked != null) {
+        numOfTimesBlocked.updateAndGet(v -> v + 1);
+        blocked.addListener(
+            new ReceiveTask(queue, numOfTsBlocksToReceive, numOfTimesBlocked, executor), executor);
+      }
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index c8c6c00329..ef5b83344a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -175,7 +175,7 @@ public class SourceHandleTest {
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
 
-    // Construct a mock LocalMemoryManager with capacity 3 * mockTsBlockSize.
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool spyMemoryPool =
         Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
index b0f778d989..5b60925fe7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
@@ -46,6 +46,12 @@ public class Utils {
     return mockTsBlocks;
   }
 
+  public static TsBlock createMockTsBlock(long mockTsBlockSize) {
+    TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
+    Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+    return mockTsBlock;
+  }
+
   public static MemoryPool createMockBlockedMemoryPool(
       String queryId, int numOfMockTsBlock, long mockTsBlockSize) {
     MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);