You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/04 01:17:46 UTC
[iotdb] branch master updated: [IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef5936d56 [IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)
5ef5936d56 is described below
commit 5ef5936d564bb401764e72001334c24681b85983
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Aug 4 09:17:40 2022 +0800
[IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)
---
.../iotdb/db/mpp/execution/driver/Driver.java | 3 +-
.../db/mpp/execution/exchange/ISinkHandle.java | 2 +-
.../db/mpp/execution/exchange/LocalSinkHandle.java | 10 ++--
.../db/mpp/execution/exchange/SinkHandle.java | 52 +++++++++-------
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 2 +-
.../execution/exchange/LocalSinkHandleTest.java | 6 +-
.../db/mpp/execution/exchange/SinkHandleTest.java | 70 +++++++++++++---------
.../db/mpp/execution/exchange/StubSinkHandle.java | 4 +-
.../iotdb/db/mpp/execution/exchange/Utils.java | 11 +++-
9 files changed, 93 insertions(+), 67 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index acf8e89470..3462b0fc7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -186,7 +185,7 @@ public abstract class Driver implements IDriver {
if (root.hasNext()) {
TsBlock tsBlock = root.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
- sinkHandle.send(Collections.singletonList(tsBlock));
+ sinkHandle.send(tsBlock);
}
}
return NOT_BLOCKED;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 4450ca8c21..406ea5d625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -41,7 +41,7 @@ public interface ISinkHandle {
* the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
* will be thrown if any exception happened during the data transmission.
*/
- void send(List<TsBlock> tsBlocks);
+ void send(TsBlock tsBlock);
/**
* Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index d6960e7664..aaba4b8d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -100,8 +100,8 @@ public class LocalSinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) {
- Validate.notNull(tsBlocks, "tsBlocks is null");
+ public void send(TsBlock tsBlock) {
+ Validate.notNull(tsBlock, "tsBlocks is null");
synchronized (this) {
checkState();
if (!blocked.isDone()) {
@@ -113,11 +113,9 @@ public class LocalSinkHandle implements ISinkHandle {
if (queue.hasNoMoreTsBlocks()) {
return;
}
- logger.info("send TsBlocks. Size: {}", tsBlocks.size());
+ logger.info("send TsBlocks");
synchronized (this) {
- for (TsBlock tsBlock : tsBlocks) {
- blocked = queue.add(tsBlock);
- }
+ blocked = queue.add(tsBlock);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index e8247cc621..872720fb7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
@@ -46,9 +47,9 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
-import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SinkHandle implements ISinkHandle {
@@ -71,15 +72,19 @@ public class SinkHandle implements ISinkHandle {
// Use LinkedHashMap to meet 2 needs,
// 1. Predictable iteration order so that removing buffered tsblocks can be efficient.
// 2. Fast lookup.
- private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
+ private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
+ new LinkedHashMap<>();
+
+ // size for current TsBlock to reserve and free
+ private long currentTsBlockSize;
private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager;
- private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+ private volatile ListenableFuture<Void> blocked;
private int nextSequenceId = 0;
/** The actual buffered memory in bytes, including the amount of memory being reserved. */
- private long bufferRetainedSizeInBytes = 0;
+ private long bufferRetainedSizeInBytes;
private boolean aborted = false;
@@ -109,6 +114,12 @@ public class SinkHandle implements ISinkHandle {
this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName = createFullIdFrom(localFragmentInstanceId, "SinkHandle");
+ this.blocked =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(localFragmentInstanceId.getQueryId(), DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@Override
@@ -122,8 +133,8 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public synchronized void send(List<TsBlock> tsBlocks) {
- Validate.notNull(tsBlocks, "tsBlocks is null");
+ public synchronized void send(TsBlock tsBlock) {
+ Validate.notNull(tsBlock, "tsBlocks is null");
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
@@ -131,10 +142,7 @@ public class SinkHandle implements ISinkHandle {
if (noMoreTsBlocks) {
return;
}
- long retainedSizeInBytes = 0L;
- for (TsBlock tsBlock : tsBlocks) {
- retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
- }
+ long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
int startSequenceId;
List<Long> tsBlockSizes = new ArrayList<>();
startSequenceId = nextSequenceId;
@@ -143,12 +151,13 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
bufferRetainedSizeInBytes += retainedSizeInBytes;
- for (TsBlock tsBlock : tsBlocks) {
- sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
- nextSequenceId += 1;
- }
+
+ sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
+ nextSequenceId += 1;
+ currentTsBlockSize = retainedSizeInBytes;
+
for (int i = startSequenceId; i < nextSequenceId; i++) {
- tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
+ tsBlockSizes.add(sequenceIdToTsBlock.get(i).left.getRetainedSizeInBytes());
}
// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
@@ -198,6 +207,7 @@ public class SinkHandle implements ISinkHandle {
}
logger.info("set noMoreTsBlocks to true");
noMoreTsBlocks = true;
+
if (isFinished()) {
logger.info("revoke onFinish() of sinkHandleListener");
sinkHandleListener.onFinish(this);
@@ -263,7 +273,7 @@ public class SinkHandle implements ISinkHandle {
ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
TsBlock tsBlock;
- tsBlock = sequenceIdToTsBlock.get(sequenceId);
+ tsBlock = sequenceIdToTsBlock.get(sequenceId).left;
if (tsBlock == null) {
throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
}
@@ -276,17 +286,19 @@ public class SinkHandle implements ISinkHandle {
if (aborted) {
return;
}
- Iterator<Entry<Integer, TsBlock>> iterator = sequenceIdToTsBlock.entrySet().iterator();
+ Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
+ sequenceIdToTsBlock.entrySet().iterator();
while (iterator.hasNext()) {
- Entry<Integer, TsBlock> entry = iterator.next();
+ Entry<Integer, Pair<TsBlock, Long>> entry = iterator.next();
if (entry.getKey() < startSequenceId) {
continue;
}
if (entry.getKey() >= endSequenceId) {
break;
}
- freedBytes += entry.getValue().getRetainedSizeInBytes();
- bufferRetainedSizeInBytes -= entry.getValue().getRetainedSizeInBytes();
+
+ freedBytes += entry.getValue().right;
+ bufferRetainedSizeInBytes -= entry.getValue().right;
iterator.remove();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index e3f4d0fc66..7b24d32c13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -195,7 +195,7 @@ public class MemoryPool {
Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
while (iterator.hasNext()) {
MemoryReservationFuture<Void> future = iterator.next();
- if (future.isCancelled()) {
+ if (future.isCancelled() || future.isDone()) {
continue;
}
long bytesToReserve = future.getBytes();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 422139a959..1c3cb4e659 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Collections;
-
public class LocalSinkHandleTest {
@Test
public void testSend() {
@@ -68,7 +66,7 @@ public class LocalSinkHandleTest {
// Send TsBlocks.
int numOfSentTsblocks = 0;
while (localSinkHandle.isFull().isDone()) {
- localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+ localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
numOfSentTsblocks += 1;
}
Assert.assertEquals(6, numOfSentTsblocks);
@@ -133,7 +131,7 @@ public class LocalSinkHandleTest {
// Send TsBlocks.
int numOfSentTsblocks = 0;
while (localSinkHandle.isFull().isDone()) {
- localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+ localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
numOfSentTsblocks += 1;
}
Assert.assertEquals(6, numOfSentTsblocks);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index f8110fcf1d..411370c14a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -41,13 +41,15 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class SinkHandleTest {
@Test
public void testOneTimeNotBlockedSend() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
- final int numOfMockTsBlock = 10;
+ final int numOfMockTsBlock = 1;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -95,18 +97,20 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks.get(0));
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
- Mockito.verify(mockMemoryPool, Mockito.times(1))
+ Mockito.verify(mockMemoryPool, Mockito.times(2))
.reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
try {
Thread.sleep(100L);
@@ -148,7 +152,7 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertTrue(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(mockTsBlockSize, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(1))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -173,7 +177,7 @@ public class SinkHandleTest {
public void testMultiTimesBlockedSend() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
- final int numOfMockTsBlock = 10;
+ final int numOfMockTsBlock = 1;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -223,18 +227,20 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks.get(0));
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
- Mockito.verify(mockMemoryPool, Mockito.times(1))
+ Mockito.verify(mockMemoryPool, Mockito.times(2))
.reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
try {
Thread.sleep(100L);
@@ -269,19 +275,21 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(1))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
// Send tsblocks.
- sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks.get(0));
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
- Mockito.verify(mockMemoryPool, Mockito.times(2))
+ Mockito.verify(mockMemoryPool, Mockito.times(3))
.reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
try {
Thread.sleep(100L);
@@ -334,7 +342,8 @@ public class SinkHandleTest {
sinkHandle.acknowledgeTsBlock(numOfMockTsBlock, numOfMockTsBlock * 2);
Assert.assertTrue(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(2))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -344,7 +353,7 @@ public class SinkHandleTest {
public void testFailedSend() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
- final int numOfMockTsBlock = 10;
+ final int numOfMockTsBlock = 1;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -395,18 +404,20 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks.get(0));
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
- Mockito.verify(mockMemoryPool, Mockito.times(1))
+ Mockito.verify(mockMemoryPool, Mockito.times(2))
.reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
try {
Thread.sleep(100L);
@@ -446,7 +457,7 @@ public class SinkHandleTest {
public void testAbort() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
- final int numOfMockTsBlock = 10;
+ final int numOfMockTsBlock = 1;
final TEndPoint remoteEndpoint =
new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -456,7 +467,9 @@ public class SinkHandleTest {
// Construct a mock LocalMemoryManager that returns blocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
MemoryPool spyMemoryPool =
- Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 10 * mockTsBlockSize));
+ Mockito.spy(
+ new MemoryPool(
+ "test", numOfMockTsBlock * mockTsBlockSize, numOfMockTsBlock * mockTsBlockSize));
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
// Construct a mock SinkHandleListener.
@@ -496,20 +509,21 @@ public class SinkHandleTest {
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
- Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- sinkHandle.send(mockTsBlocks);
- sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks.get(0));
Future<?> blocked = sinkHandle.isFull();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- 2 * mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
- Assert.assertEquals(2 * numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+ mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
sinkHandle.abort();
Assert.assertTrue(blocked.isDone());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index d68ad6726a..2930abe48b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -59,8 +59,8 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) {
- this.tsBlocks.addAll(tsBlocks);
+ public void send(TsBlock tsBlock) {
+ this.tsBlocks.add(tsBlock);
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
index 211f635636..165f0e65a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
@@ -54,6 +54,7 @@ public class Utils {
public static MemoryPool createMockBlockedMemoryPool(
String queryId, int numOfMockTsBlock, long mockTsBlockSize) {
+ long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
AtomicReference<SettableFuture<Void>> settableFuture = new AtomicReference<>();
settableFuture.set(SettableFuture.create());
@@ -62,8 +63,12 @@ public class Utils {
Mockito.when(mockMemoryPool.reserve(Mockito.eq(queryId), Mockito.anyLong()))
.thenAnswer(
invocation -> {
- reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
- settableFuture.set(SettableFuture.create());
+ long bytesToReserve = invocation.getArgument(1);
+ if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
+ reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
+ } else {
+ settableFuture.set(SettableFuture.create());
+ }
return settableFuture.get();
});
Mockito.doAnswer(
@@ -72,12 +77,12 @@ public class Utils {
reservedBytes.updateAndGet(v -> v - (long) invocation.getArgument(1));
if (reservedBytes.get() <= 0) {
settableFuture.get().set(null);
+ reservedBytes.updateAndGet(v -> v + mockTsBlockSize);
}
return null;
})
.when(mockMemoryPool)
.free(Mockito.eq(queryId), Mockito.anyLong());
- long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
Mockito.when(mockMemoryPool.tryReserve(Mockito.eq(queryId), Mockito.anyLong()))
.thenAnswer(
invocation -> {