You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/02 03:14:20 UTC
[iotdb] branch master updated: [IOTDB-2972] implement local sink/source handle (#5732)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8a6031a3c1 [IOTDB-2972] implement local sink/source handle (#5732)
8a6031a3c1 is described below
commit 8a6031a3c187953f3989e7604a252ef2ad59e4f0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Mon May 2 11:14:15 2022 +0800
[IOTDB-2972] implement local sink/source handle (#5732)
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 106 ++++++++++++--
.../iotdb/db/mpp/buffer/IDataBlockManager.java | 24 +++-
.../iotdb/db/mpp/buffer/LocalSinkHandle.java | 146 +++++++++++++++++++
.../iotdb/db/mpp/buffer/LocalSourceHandle.java | 125 ++++++++++++++++
.../iotdb/db/mpp/buffer/SharedTsBlockQueue.java | 152 ++++++++++++++++++++
.../iotdb/db/mpp/buffer/DataBlockManagerTest.java | 121 ++++++++++++++++
.../iotdb/db/mpp/buffer/LocalSinkHandleTest.java | 151 +++++++++++++++++++
.../iotdb/db/mpp/buffer/LocalSourceHandleTest.java | 119 +++++++++++++++
.../db/mpp/buffer/SharedTsBlockQueueTest.java | 160 +++++++++++++++++++++
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 2 +-
.../java/org/apache/iotdb/db/mpp/buffer/Utils.java | 6 +
11 files changed, 1094 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 20b7be00bd..debf576e40 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -86,7 +86,7 @@ public class DataBlockManager implements IDataBlockManager {
+ ".");
}
TGetDataBlockResponse resp = new TGetDataBlockResponse();
- SinkHandle sinkHandle = sinkHandles.get(req.getSourceFragmentInstanceId());
+ SinkHandle sinkHandle = (SinkHandle) sinkHandles.get(req.getSourceFragmentInstanceId());
for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
try {
ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
@@ -111,8 +111,7 @@ public class DataBlockManager implements IDataBlockManager {
+ e.getSourceFragmentInstanceId()
+ ".");
}
- sinkHandles
- .get(e.getSourceFragmentInstanceId())
+ ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
.acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
}
@@ -138,7 +137,8 @@ public class DataBlockManager implements IDataBlockManager {
}
SourceHandle sourceHandle =
- sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
+ (SourceHandle)
+ sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
}
@@ -163,9 +163,10 @@ public class DataBlockManager implements IDataBlockManager {
+ ".");
}
SourceHandle sourceHandle =
- sourceHandles
- .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
- .get(e.getTargetPlanNodeId());
+ (SourceHandle)
+ sourceHandles
+ .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
+ .get(e.getTargetPlanNodeId());
sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
}
}
@@ -263,8 +264,8 @@ public class DataBlockManager implements IDataBlockManager {
private final ExecutorService executorService;
private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
dataBlockServiceClientManager;
- private final Map<TFragmentInstanceId, Map<String, SourceHandle>> sourceHandles;
- private final Map<TFragmentInstanceId, SinkHandle> sinkHandles;
+ private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
+ private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
private DataBlockServiceImpl dataBlockService;
@@ -288,6 +289,47 @@ public class DataBlockManager implements IDataBlockManager {
return dataBlockService;
}
+ @Override
+ public synchronized ISinkHandle createLocalSinkHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ // TODO: replace with callbacks to decouple DataBlockManager from FragmentInstanceContext
+ FragmentInstanceContext instanceContext) {
+ if (sinkHandles.containsKey(localFragmentInstanceId)) {
+ throw new IllegalStateException(
+ "Local sink handle for " + localFragmentInstanceId + " exists.");
+ }
+
+ logger.debug(
+ "Create local sink handle to plan node {} of {} for {}",
+ remotePlanNodeId,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId);
+
+ SharedTsBlockQueue queue;
+ if (sourceHandles.containsKey(remoteFragmentInstanceId)
+ && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+ logger.debug("Get shared tsblock queue from local source handle");
+ queue =
+ ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
+ .getSharedTsBlockQueue();
+ } else {
+ logger.debug("Create shared tsblock queue");
+ queue = new SharedTsBlockQueue(remoteFragmentInstanceId, localMemoryManager);
+ }
+
+ LocalSinkHandle localSinkHandle =
+ new LocalSinkHandle(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localFragmentInstanceId,
+ queue,
+ new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
+ sinkHandles.put(localFragmentInstanceId, localSinkHandle);
+ return localSinkHandle;
+ }
+
@Override
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
@@ -321,6 +363,48 @@ public class DataBlockManager implements IDataBlockManager {
return sinkHandle;
}
+ @Override
+ public synchronized ISourceHandle createLocalSourceHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IDataBlockManagerCallback<Throwable> onFailureCallback) {
+ if (sourceHandles.containsKey(localFragmentInstanceId)
+ && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+ throw new IllegalStateException(
+ "Source handle for plan node "
+ + localPlanNodeId
+ + " of "
+ + localFragmentInstanceId
+ + " exists.");
+ }
+
+ logger.debug(
+ "Create local source handle from {} for plan node {} of {}",
+ remoteFragmentInstanceId,
+ localPlanNodeId,
+ localFragmentInstanceId);
+ SharedTsBlockQueue queue;
+ if (sinkHandles.containsKey(remoteFragmentInstanceId)) {
+ logger.debug("Get shared tsblock queue from local sink handle");
+ queue = ((LocalSinkHandle) sinkHandles.get(remoteFragmentInstanceId)).getSharedTsBlockQueue();
+ } else {
+ logger.debug("Create shared tsblock queue");
+ queue = new SharedTsBlockQueue(localFragmentInstanceId, localMemoryManager);
+ }
+ LocalSourceHandle localSourceHandle =
+ new LocalSourceHandle(
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ queue,
+ new SourceHandleListenerImpl(onFailureCallback));
+ sourceHandles
+ .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
+ .put(localPlanNodeId, localSourceHandle);
+ return localSourceHandle;
+ }
+
@Override
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
@@ -376,8 +460,8 @@ public class DataBlockManager implements IDataBlockManager {
sinkHandles.remove(fragmentInstanceId);
}
if (sourceHandles.containsKey(fragmentInstanceId)) {
- Map<String, SourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
- for (Entry<String, SourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
+ Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
+ for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
logger.info("Close source handle {}", sourceHandles);
entry.getValue().abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index 049f8b023d..7eceb835eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -30,14 +30,20 @@ public interface IDataBlockManager {
*
* @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
* blocks to the sink handle.
- * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
- * be sent to.
+ * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+ * should be sent to.
* @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
* @param instanceContext The context of local fragment instance.
*/
ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
- TEndPoint endpoint,
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ FragmentInstanceContext instanceContext);
+
+ ISinkHandle createLocalSinkHandle(
+ TFragmentInstanceId localFragmentInstanceId,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
FragmentInstanceContext instanceContext);
@@ -49,15 +55,21 @@ public interface IDataBlockManager {
* @param localFragmentInstanceId ID of the local fragment instance who receives data blocks from
* the source handle.
* @param localPlanNodeId The local sink plan node ID.
- * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
- * be received from.
+ * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+ * should be received from.
* @param remoteFragmentInstanceId ID of the remote fragment instance.
* @param onFailureCallback The callback on failure.
*/
ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
- TEndPoint endpoint,
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IDataBlockManagerCallback<Throwable> onFailureCallback);
+
+ ISourceHandle createLocalSourceHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
IDataBlockManagerCallback<Throwable> onFailureCallback);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java
new file mode 100644
index 0000000000..f3be21c6b6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandle.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class LocalSinkHandle implements ISinkHandle {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalSinkHandle.class);
+
+ private final TFragmentInstanceId remoteFragmentInstanceId;
+ private final String remotePlanNodeId;
+ private final TFragmentInstanceId localFragmentInstanceId;
+ private final SinkHandleListener sinkHandleListener;
+
+ private final SharedTsBlockQueue queue;
+ private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+ private boolean aborted = false;
+
+ public LocalSinkHandle(
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ TFragmentInstanceId localFragmentInstanceId,
+ SharedTsBlockQueue queue,
+ SinkHandleListener sinkHandleListener) {
+ this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+ this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
+ this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+ this.queue = Validate.notNull(queue);
+ }
+
+ @Override
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
+ return localFragmentInstanceId;
+ }
+
+ @Override
+ public long getBufferRetainedSizeInBytes() {
+ return queue.getBufferRetainedSizeInBytes();
+ }
+
+ @Override
+ public synchronized ListenableFuture<Void> isFull() {
+ if (aborted) {
+ throw new IllegalStateException("Sink handle is closed.");
+ }
+ return nonCancellationPropagating(blocked);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ }
+
+ @Override
+ public synchronized void send(List<TsBlock> tsBlocks) {
+ Validate.notNull(tsBlocks, "tsBlocks is null");
+ if (aborted) {
+ throw new IllegalStateException("Sink handle is aborted.");
+ }
+ if (!blocked.isDone()) {
+ throw new IllegalStateException("Sink handle is blocked.");
+ }
+ if (queue.hasNoMoreTsBlocks()) {
+ return;
+ }
+ for (TsBlock tsBlock : tsBlocks) {
+ blocked = queue.add(tsBlock);
+ }
+ }
+
+ @Override
+ public synchronized void send(int partition, List<TsBlock> tsBlocks) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void setNoMoreTsBlocks() {
+ logger.info("Set no-more-tsblocks to {}.", this);
+ if (aborted) {
+ return;
+ }
+ queue.setNoMoreTsBlocks(true);
+ sinkHandleListener.onEndOfBlocks(this);
+ if (isFinished()) {
+ sinkHandleListener.onFinish(this);
+ }
+ logger.info("No-more-tsblocks has been set to {}.", this);
+ }
+
+ @Override
+ public synchronized void abort() {
+ logger.info("Sink handle {} is being aborted.", this);
+ aborted = true;
+ queue.destroy();
+ sinkHandleListener.onAborted(this);
+ logger.info("Sink handle {} is aborted", this);
+ }
+
+ public TFragmentInstanceId getRemoteFragmentInstanceId() {
+ return remoteFragmentInstanceId;
+ }
+
+ public String getRemotePlanNodeId() {
+ return remotePlanNodeId;
+ }
+
+ SharedTsBlockQueue getSharedTsBlockQueue() {
+ return queue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java
new file mode 100644
index 0000000000..fc50072711
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandle.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class LocalSourceHandle implements ISourceHandle {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalSourceHandle.class);
+
+ private final TFragmentInstanceId remoteFragmentInstanceId;
+ private final TFragmentInstanceId localFragmentInstanceId;
+ private final String localPlanNodeId;
+ private final SourceHandleListener sourceHandleListener;
+ private final SharedTsBlockQueue queue;
+ private boolean aborted = false;
+
+ public LocalSourceHandle(
+ TFragmentInstanceId remoteFragmentInstanceId,
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ SharedTsBlockQueue queue,
+ SourceHandleListener sourceHandleListener) {
+ this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+ this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+ this.queue = Validate.notNull(queue);
+ this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+ }
+
+ @Override
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
+ return localFragmentInstanceId;
+ }
+
+ @Override
+ public String getLocalPlanNodeId() {
+ return localPlanNodeId;
+ }
+
+ @Override
+ public long getBufferRetainedSizeInBytes() {
+ return queue.getBufferRetainedSizeInBytes();
+ }
+
+ @Override
+ public TsBlock receive() {
+ if (aborted) {
+ throw new IllegalStateException("Source handle is aborted.");
+ }
+ if (!queue.isBlocked().isDone()) {
+ throw new IllegalStateException("Source handle is blocked.");
+ }
+ TsBlock tsBlock;
+ synchronized (this) {
+ tsBlock = queue.remove();
+ }
+ if (isFinished()) {
+ sourceHandleListener.onFinished(this);
+ }
+ return tsBlock;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return queue.hasNoMoreTsBlocks() && queue.isEmpty();
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ if (aborted) {
+ throw new IllegalStateException("Source handle is closed.");
+ }
+ return nonCancellationPropagating(queue.isBlocked());
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ @Override
+ public synchronized void abort() {
+ if (aborted) {
+ return;
+ }
+ queue.destroy();
+ aborted = true;
+ sourceHandleListener.onAborted(this);
+ }
+
+ public TFragmentInstanceId getRemoteFragmentInstanceId() {
+ return remoteFragmentInstanceId;
+ }
+
+ SharedTsBlockQueue getSharedTsBlockQueue() {
+ return queue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java
new file mode 100644
index 0000000000..46d2606346
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueue.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.lang3.Validate;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class SharedTsBlockQueue {
+
+ private final TFragmentInstanceId localFragmentInstanceId;
+ private final LocalMemoryManager localMemoryManager;
+
+ @GuardedBy("this")
+ private boolean noMoreTsBlocks = false;
+
+ @GuardedBy("this")
+ private long bufferRetainedSizeInBytes = 0L;
+
+ @GuardedBy("this")
+ private final Queue<TsBlock> queue = new LinkedList<>();
+
+ @GuardedBy("this")
+ private SettableFuture<Void> blocked = SettableFuture.create();
+
+ @GuardedBy("this")
+ private ListenableFuture<Void> blockedOnMemory;
+
+ @GuardedBy("this")
+ private boolean destroyed = false;
+
+ public SharedTsBlockQueue(
+ TFragmentInstanceId fragmentInstanceId, LocalMemoryManager localMemoryManager) {
+ this.localFragmentInstanceId =
+ Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+ this.localMemoryManager =
+ Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+ }
+
+ public boolean hasNoMoreTsBlocks() {
+ return noMoreTsBlocks;
+ }
+
+ public long getBufferRetainedSizeInBytes() {
+ return bufferRetainedSizeInBytes;
+ }
+
+ public ListenableFuture<Void> isBlocked() {
+ return blocked;
+ }
+
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ /** Notify no more tsblocks will be added to the queue. */
+ public synchronized void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+ if (destroyed) {
+ throw new IllegalStateException("queue has been destroyed");
+ }
+ this.noMoreTsBlocks = noMoreTsBlocks;
+ }
+
+ /**
+ * Remove a tsblock from the head of the queue and return. Should be invoked only when the future
+ * returned by {@link #isBlocked()} completes.
+ */
+ public synchronized TsBlock remove() {
+ if (destroyed) {
+ throw new IllegalStateException("queue has been destroyed");
+ }
+ TsBlock tsBlock = queue.remove();
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+ bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+ if (blocked.isDone() && queue.isEmpty()) {
+ blocked = SettableFuture.create();
+ }
+ return tsBlock;
+ }
+
+ /**
+ * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
+ * the returned future of last invocation completes.
+ */
+ public synchronized ListenableFuture<Void> add(TsBlock tsBlock) {
+ if (destroyed) {
+ throw new IllegalStateException("queue has been destroyed");
+ }
+
+ Validate.notNull(tsBlock, "tsblock cannot be null");
+ Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+ blockedOnMemory =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+ bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+ queue.add(tsBlock);
+ if (!blocked.isDone()) {
+ blocked.set(null);
+ }
+ return blockedOnMemory;
+ }
+
+ /** Destroy the queue and cancel the future. */
+ public synchronized void destroy() {
+ if (destroyed) {
+ return;
+ }
+ destroyed = true;
+ if (!blocked.isDone()) {
+ blocked.set(null);
+ }
+ if (blockedOnMemory != null) {
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+ }
+ queue.clear();
+ if (bufferRetainedSizeInBytes > 0L) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java
new file mode 100644
index 0000000000..a57fef344a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executors;
+
+public class DataBlockManagerTest {
+ @Test
+ public void testCreateLocalSinkHandle() {
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
+ final String remotePlanNodeId = "exchange_0";
+ final FragmentInstanceContext mockFragmentInstanceContext =
+ Mockito.mock(FragmentInstanceContext.class);
+
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10240L, 5120L));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+
+ DataBlockManager dataBlockManager =
+ new DataBlockManager(
+ mockLocalMemoryManager,
+ new TsBlockSerdeFactory(),
+ Executors.newSingleThreadExecutor(),
+ new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
+
+ ISinkHandle localSinkHandle =
+ dataBlockManager.createLocalSinkHandle(
+ localFragmentInstanceId,
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ mockFragmentInstanceContext);
+
+ Assert.assertTrue(localSinkHandle instanceof LocalSinkHandle);
+
+ ISourceHandle localSourceHandle =
+ dataBlockManager.createLocalSourceHandle(
+ remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, t -> {});
+
+ Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
+
+ Assert.assertEquals(
+ ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+ ((LocalSourceHandle) localSourceHandle).getSharedTsBlockQueue());
+ }
+
+ @Test
+ public void testCreateLocalSourceHandle() {
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
+ final String localPlanNodeId = "exchange_0";
+ final FragmentInstanceContext mockFragmentInstanceContext =
+ Mockito.mock(FragmentInstanceContext.class);
+
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool spyMemoryPool = Mockito.spy(new MemoryPool("test", 10240L, 5120L));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+
+ DataBlockManager dataBlockManager =
+ new DataBlockManager(
+ mockLocalMemoryManager,
+ new TsBlockSerdeFactory(),
+ Executors.newSingleThreadExecutor(),
+ new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
+
+ ISourceHandle localSourceHandle =
+ dataBlockManager.createLocalSourceHandle(
+ localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, t -> {});
+
+ Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
+
+ ISinkHandle localSinkHandle =
+ dataBlockManager.createLocalSinkHandle(
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockFragmentInstanceContext);
+
+ Assert.assertTrue(localSinkHandle instanceof LocalSinkHandle);
+
+ Assert.assertEquals(
+ ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+ ((LocalSourceHandle) localSourceHandle).getSharedTsBlockQueue());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java
new file mode 100644
index 0000000000..ce48ed1667
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSinkHandleTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class LocalSinkHandleTest {
+ @Test
+ public void testSend() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+ final String remotePlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool spyMemoryPool =
+ Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+ // Construct a mock SinkHandleListener.
+ SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+ // Construct a shared tsblock queue.
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
+
+ // Construct SinkHandle.
+ LocalSinkHandle localSinkHandle =
+ new LocalSinkHandle(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localFragmentInstanceId,
+ queue,
+ mockSinkHandleListener);
+ Assert.assertTrue(localSinkHandle.isFull().isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertFalse(localSinkHandle.isAborted());
+ Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+
+ // Send tsblocks.
+ int numOfSentTsblocks = 0;
+ while (localSinkHandle.isFull().isDone()) {
+ localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+ numOfSentTsblocks += 1;
+ }
+ Assert.assertEquals(6, numOfSentTsblocks);
+ Assert.assertFalse(localSinkHandle.isFull().isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertEquals(6 * mockTsBlockSize, localSinkHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(spyMemoryPool, Mockito.times(6)).reserve(queryId, mockTsBlockSize);
+
+ // Receive tsblocks.
+ int numOfReceivedTsblocks = 0;
+ while (!queue.isEmpty()) {
+ queue.remove();
+ numOfReceivedTsblocks += 1;
+ }
+ Assert.assertEquals(6, numOfReceivedTsblocks);
+ Assert.assertTrue(localSinkHandle.isFull().isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(spyMemoryPool, Mockito.times(6)).free(queryId, mockTsBlockSize);
+
+ // Set no-more-tsblocks.
+ localSinkHandle.setNoMoreTsBlocks();
+ Assert.assertTrue(localSinkHandle.isFull().isDone());
+ Assert.assertTrue(localSinkHandle.isFinished());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(localSinkHandle);
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(localSinkHandle);
+ }
+
+ @Test
+ public void testAbort() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+ final String remotePlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool spyMemoryPool =
+ Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+ // Construct a mock SinkHandleListener.
+ SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+ // Construct a shared tsblock queue.
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(remoteFragmentInstanceId, mockLocalMemoryManager);
+
+ // Construct SinkHandle.
+ LocalSinkHandle localSinkHandle =
+ new LocalSinkHandle(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localFragmentInstanceId,
+ queue,
+ mockSinkHandleListener);
+ Assert.assertTrue(localSinkHandle.isFull().isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertFalse(localSinkHandle.isAborted());
+ Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
+
+ // Send tsblocks.
+ int numOfSentTsblocks = 0;
+ while (localSinkHandle.isFull().isDone()) {
+ localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+ numOfSentTsblocks += 1;
+ }
+ Assert.assertEquals(6, numOfSentTsblocks);
+ ListenableFuture<Void> blocked = localSinkHandle.isFull();
+ Assert.assertFalse(blocked.isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertEquals(6 * mockTsBlockSize, localSinkHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(spyMemoryPool, Mockito.times(6)).reserve(queryId, mockTsBlockSize);
+
+ // Abort.
+ localSinkHandle.abort();
+ Assert.assertTrue(blocked.isDone());
+ Assert.assertFalse(localSinkHandle.isFinished());
+ Assert.assertTrue(localSinkHandle.isAborted());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(localSinkHandle);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java
new file mode 100644
index 0000000000..9c9575d9ca
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/LocalSourceHandleTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocalSourceHandleTest {
+ @Test
+ public void testReceive() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+ final String localPlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+ // Construct a mock LocalMemoryManager that do not block any reservation.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ // Construct a mock SourceHandleListener.
+ SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+ // Construct a shared tsblock queue.
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(localFragmentInstanceId, mockLocalMemoryManager);
+
+ LocalSourceHandle localSourceHandle =
+ new LocalSourceHandle(
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ queue,
+ mockSourceHandleListener);
+ Assert.assertFalse(localSourceHandle.isBlocked().isDone());
+ Assert.assertFalse(localSourceHandle.isAborted());
+ Assert.assertFalse(localSourceHandle.isFinished());
+ Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
+
+ // Local sink handle produces tsblocks.
+ queue.add(Utils.createMockTsBlock(mockTsBlockSize));
+ queue.setNoMoreTsBlocks(true);
+ Assert.assertTrue(localSourceHandle.isBlocked().isDone());
+ Assert.assertFalse(localSourceHandle.isAborted());
+ Assert.assertFalse(localSourceHandle.isFinished());
+ Assert.assertEquals(mockTsBlockSize, localSourceHandle.getBufferRetainedSizeInBytes());
+
+ // Consume tsblocks.
+ Assert.assertTrue(localSourceHandle.isBlocked().isDone());
+ localSourceHandle.receive();
+ ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
+ Assert.assertFalse(blocked.isDone());
+ Assert.assertFalse(localSourceHandle.isAborted());
+ Assert.assertTrue(localSourceHandle.isFinished());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(localSourceHandle);
+ }
+
+ @Test
+ public void testAbort() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+ final String localPlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+ // Construct a mock LocalMemoryManager that do not block any reservation.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ // Construct a mock SourceHandleListener.
+ SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+ // Construct a shared tsblock queue.
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(localFragmentInstanceId, mockLocalMemoryManager);
+
+ LocalSourceHandle localSourceHandle =
+ new LocalSourceHandle(
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ queue,
+ mockSourceHandleListener);
+ ListenableFuture<Void> future = localSourceHandle.isBlocked();
+ Assert.assertFalse(future.isDone());
+ Assert.assertFalse(localSourceHandle.isAborted());
+ Assert.assertFalse(localSourceHandle.isFinished());
+ Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
+
+ // Close the local source handle.
+ localSourceHandle.abort();
+ Assert.assertTrue(future.isDone());
+ Assert.assertTrue(localSourceHandle.isAborted());
+ Assert.assertFalse(localSourceHandle.isFinished());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(localSourceHandle);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java
new file mode 100644
index 0000000000..5b79f10c41
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SharedTsBlockQueueTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.memory.MemoryPool;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SharedTsBlockQueueTest {
+ @Test(timeout = 5000L)
+ public void concurrencyTest() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool spyMemoryPool =
+ Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(new TFragmentInstanceId(queryId, 0, "0"), mockLocalMemoryManager);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ AtomicReference<Integer> numOfTimesSenderBlocked = new AtomicReference<>(0);
+ AtomicReference<Integer> numOfTimesReceiverBlocked = new AtomicReference<>(0);
+ AtomicReference<Integer> numOfTsBlocksToSend = new AtomicReference<>(1000);
+ AtomicReference<Integer> numOfTsBlocksToReceive = new AtomicReference<>(1000);
+ executor.submit(
+ new SendTask(
+ queue, mockTsBlockSize, numOfTsBlocksToSend, numOfTimesSenderBlocked, executor));
+ executor.submit(
+ new ReceiveTask(queue, numOfTsBlocksToReceive, numOfTimesReceiverBlocked, executor));
+
+ while (numOfTsBlocksToSend.get() != 0 && numOfTsBlocksToReceive.get() != 0) {
+ String message =
+ String.format(
+ "Sender %d: %d, Receiver %d: %d",
+ numOfTimesSenderBlocked.get(),
+ numOfTsBlocksToSend.get(),
+ numOfTimesReceiverBlocked.get(),
+ numOfTsBlocksToReceive.get());
+ System.out.println(message);
+ try {
+ Thread.sleep(10L);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+
+ private static class SendTask implements Runnable {
+
+ private final SharedTsBlockQueue queue;
+ private final long mockTsBlockSize;
+ private final AtomicReference<Integer> numOfTsBlocksToSend;
+ private final AtomicReference<Integer> numOfTimesBlocked;
+ private final ExecutorService executor;
+
+ public SendTask(
+ SharedTsBlockQueue queue,
+ long mockTsBlockSize,
+ AtomicReference<Integer> numOfTsBlocksToSend,
+ AtomicReference<Integer> numOfTimesBlocked,
+ ExecutorService executor) {
+ this.queue = Validate.notNull(queue);
+ Validate.isTrue(mockTsBlockSize > 0L);
+ this.mockTsBlockSize = mockTsBlockSize;
+ this.numOfTsBlocksToSend = Validate.notNull(numOfTsBlocksToSend);
+ this.numOfTimesBlocked = Validate.notNull(numOfTimesBlocked);
+ this.executor = Validate.notNull(executor);
+ }
+
+ @Override
+ public void run() {
+ ListenableFuture<Void> blockedOnMemory = null;
+ while (numOfTsBlocksToSend.get() > 0) {
+ blockedOnMemory = queue.add(Utils.createMockTsBlock(mockTsBlockSize));
+ numOfTsBlocksToSend.updateAndGet(v -> v - 1);
+ if (!blockedOnMemory.isDone()) {
+ break;
+ }
+ }
+
+ if (blockedOnMemory != null) {
+ numOfTimesBlocked.updateAndGet(v -> v + 1);
+ blockedOnMemory.addListener(
+ new SendTask(queue, mockTsBlockSize, numOfTsBlocksToSend, numOfTimesBlocked, executor),
+ executor);
+ } else {
+ queue.setNoMoreTsBlocks(true);
+ }
+ }
+ }
+
+ private static class ReceiveTask implements Runnable {
+
+ private final SharedTsBlockQueue queue;
+ private final AtomicReference<Integer> numOfTsBlocksToReceive;
+ private final AtomicReference<Integer> numOfTimesBlocked;
+ private final ExecutorService executor;
+
+ public ReceiveTask(
+ SharedTsBlockQueue queue,
+ AtomicReference<Integer> numOfTsBlocksToReceive,
+ AtomicReference<Integer> numOfTimesBlocked,
+ ExecutorService executor) {
+ this.queue = Validate.notNull(queue);
+ this.numOfTsBlocksToReceive = Validate.notNull(numOfTsBlocksToReceive);
+ this.numOfTimesBlocked = Validate.notNull(numOfTimesBlocked);
+ this.executor = Validate.notNull(executor);
+ }
+
+ @Override
+ public void run() {
+ ListenableFuture<Void> blocked = null;
+ while (numOfTsBlocksToReceive.get() > 0) {
+ blocked = queue.isBlocked();
+ if (blocked.isDone()) {
+ queue.remove();
+ numOfTsBlocksToReceive.updateAndGet(v -> v - 1);
+ } else {
+ break;
+ }
+ }
+
+ if (blocked != null) {
+ numOfTimesBlocked.updateAndGet(v -> v + 1);
+ blocked.addListener(
+ new ReceiveTask(queue, numOfTsBlocksToReceive, numOfTimesBlocked, executor), executor);
+ }
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index c8c6c00329..ef5b83344a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -175,7 +175,7 @@ public class SourceHandleTest {
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
- // Construct a mock LocalMemoryManager with capacity 3 * mockTsBlockSize.
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool spyMemoryPool =
Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
index b0f778d989..5b60925fe7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
@@ -46,6 +46,12 @@ public class Utils {
return mockTsBlocks;
}
+ public static TsBlock createMockTsBlock(long mockTsBlockSize) {
+ TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
+ Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+ return mockTsBlock;
+ }
+
public static MemoryPool createMockBlockedMemoryPool(
String queryId, int numOfMockTsBlock, long mockTsBlockSize) {
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);