You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/09 11:20:00 UTC
[iotdb] branch MemoryControl updated: Add UT
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/MemoryControl by this push:
new ff861dd17d Add UT
ff861dd17d is described below
commit ff861dd17d70adae894f80fde3a30a85fae0cd63
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Aug 9 19:19:51 2022 +0800
Add UT
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +-
.../db/mpp/exception/MemoryNotEnoughException.java | 28 ++
.../fragment/FragmentInstanceContext.java | 4 +
.../process/last/LastQueryMergeOperator.java | 6 +-
.../process/last/LastQuerySortOperator.java | 7 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 47 ++-
.../mpp/execution/operator/OperatorMemoryTest.java | 424 +++++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +
8 files changed, 515 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 72f48793dd..e6c7825969 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1501,8 +1501,12 @@ public class IoTDBDescriptor {
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setAllocateMemoryForTimeSeriesMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
- conf.setAllocateMemoryForReadWithoutCache(
+ conf.setAllocateMemoryForCoordinator(
maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
+ conf.setAllocateMemoryForOperators(
+ maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
+ conf.setAllocateMemoryForDataExchange(
+ maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
new file mode 100644
index 0000000000..7c5f4803e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.exception;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+
+public class MemoryNotEnoughException extends IoTDBException {
+
+ public MemoryNotEnoughException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index aff2d66a41..4d0f9d9ffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -179,4 +179,8 @@ public class FragmentInstanceContext extends QueryContext {
public long getEndTime() {
return executionEndTime.get();
}
+
+ public FragmentInstanceStateMachine getStateMachine() {
+ return stateMachine;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 9d8be1e23d..af6e7b4bf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -42,7 +42,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
// time-series
public class LastQueryMergeOperator implements ProcessOperator {
- private static final int MAP_NODE_RETRAINED_SIZE = 16;
+ public static final long MAP_NODE_RETRAINED_SIZE = 16L + Location.INSTANCE_SIZE;
private final OperatorContext operatorContext;
@@ -229,7 +229,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
long maxPeekMemory =
calculateMaxReturnSize()
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
- * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE);
+ * MAP_NODE_RETRAINED_SIZE;
long childrenMaxPeekMemory = 0;
for (Operator child : children) {
maxPeekMemory += child.calculateMaxReturnSize();
@@ -270,7 +270,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
private static class Location {
- private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
int tsBlockIndex;
int rowIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 1b56d844c1..e40da67999 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -193,13 +193,14 @@ public class LastQuerySortOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- long maxPeekMemory =
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes();
+ long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
long maxChildrenReturnSize = 0;
+ long maxChildrenPeekMemory = 0;
for (Operator child : children) {
maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
+ maxChildrenPeekMemory = Math.max(maxChildrenPeekMemory, child.calculateMaxPeekMemory());
}
- return maxPeekMemory;
+ return Math.max(maxPeekMemory + maxChildrenReturnSize, maxChildrenPeekMemory);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 8188612e61..451cc208d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,17 +18,22 @@
*/
package org.apache.iotdb.db.mpp.plan.planner;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
@@ -38,6 +43,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
*/
public class LocalExecutionPlanner {
+ /** allocated memory for operator execution */
+ private long freeMemoryForOperators =
+ IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+
public static LocalExecutionPlanner getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -47,11 +56,15 @@ public class LocalExecutionPlanner {
TypeProvider types,
FragmentInstanceContext instanceContext,
Filter timeFilter,
- DataRegion dataRegion) {
+ DataRegion dataRegion)
+ throws MemoryNotEnoughException {
LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ // check whether current free memory is enough to execute current query
+ checkMemory(root, instanceContext.getStateMachine());
+
ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
instanceContext
.getOperatorContexts()
@@ -71,7 +84,8 @@ public class LocalExecutionPlanner {
}
public SchemaDriver plan(
- PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) {
+ PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
+ throws MemoryNotEnoughException {
SchemaDriverContext schemaDriverContext =
new SchemaDriverContext(instanceContext, schemaRegion);
@@ -81,6 +95,9 @@ public class LocalExecutionPlanner {
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ // check whether current free memory is enough to execute current query
+ checkMemory(root, instanceContext.getStateMachine());
+
ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
instanceContext
.getOperatorContexts()
@@ -91,6 +108,32 @@ public class LocalExecutionPlanner {
return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
}
+ private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
+ throws MemoryNotEnoughException {
+ long estimatedMemorySize = root.calculateMaxPeekMemory();
+
+ synchronized (this) {
+ if (estimatedMemorySize > freeMemoryForOperators) {
+ throw new MemoryNotEnoughException(
+ String.format(
+ "There is not enough memory to execute current fragment instance, current remaining free memory is %d, estimated memory usage for current fragment instance is %d",
+ freeMemoryForOperators, estimatedMemorySize),
+ TSStatusCode.MEMORY_NOT_ENOUGH.getStatusCode());
+ } else {
+ freeMemoryForOperators -= estimatedMemorySize;
+ }
+ }
+
+ stateMachine.addStateChangeListener(
+ newState -> {
+ if (newState == FragmentInstanceState.FLUSHING || newState.isDone()) {
+ synchronized (this) {
+ this.freeMemoryForOperators += estimatedMemorySize;
+ }
+ }
+ });
+ }
+
private static class InstanceHolder {
private InstanceHolder() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
new file mode 100644
index 0000000000..b57ee14dcf
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatorMemoryTest {
+
+ @Test
+ public void seriesScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath("root.SeriesScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxReturnSize());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void alignedSeriesScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ AlignedPath alignedPath =
+ new AlignedPath(
+ "root.AlignedSeriesScanOperatorTest.device0",
+ Arrays.asList("sensor0", "sensor1", "sensor2"));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+ AlignedSeriesScanOperator seriesScanOperator =
+ new AlignedSeriesScanOperator(
+ planNodeId,
+ alignedPath,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+
+ assertEquals(
+ 4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxReturnSize());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void exchangeOperatorTest() {
+ ExchangeOperator exchangeOperator = new ExchangeOperator(null, null, null);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void lastCacheScanOperatorTest() {
+ TsBlock tsBlock = Mockito.mock(TsBlock.class);
+ Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+ LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock);
+
+ assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
+ assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void fillOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+ FillOperator fillOperator =
+ new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
+
+ assertEquals(2048 * 2, fillOperator.calculateMaxPeekMemory());
+ assertEquals(1024, fillOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void lastQueryCollectOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ Random random = new Random();
+ long expectedMaxPeekMemory = 0;
+ long expectedMaxReturnSize = 0;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+ long currentMaxReturnSize = random.nextInt(1024);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ children.add(child);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+ }
+ LastQueryCollectOperator lastQueryCollectOperator =
+ new LastQueryCollectOperator(Mockito.mock(OperatorContext.class), children);
+
+ assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void lastQueryMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ Random random = new Random();
+ long expectedMaxPeekMemory = 0;
+ long expectedMaxReturnSize = 0;
+ long childSumReturnSize = 0;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+ long currentMaxReturnSize = random.nextInt(1024);
+ childSumReturnSize += currentMaxReturnSize;
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ children.add(child);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+ }
+ // we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
+ // max possible should be equal to max return size among all its children and then we should
+ // also take TreeMap memory into account.
+ expectedMaxPeekMemory =
+ Math.max(
+ expectedMaxPeekMemory,
+ childSumReturnSize
+ + expectedMaxReturnSize
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE);
+
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ Mockito.mock(OperatorContext.class), children, Comparator.naturalOrder());
+
+ assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void lastQueryOperatorTest() {
+ TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
+ Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
+ List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+ long expectedMaxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ for (int i = 0; i < 4; i++) {
+ UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ children.add(child);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 2 * 1024 * 1024L);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
+ }
+ LastQueryOperator lastQueryOperator =
+ new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
+
+ assertEquals(expectedMaxPeekMemory, lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+
+ Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
+ assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void lastQuerySortOperatorTest() {
+ TsBlock tsBlock = Mockito.mock(TsBlock.class);
+ Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+ Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
+ List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+ long expectedMaxPeekMemory =
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes();
+ for (int i = 0; i < 4; i++) {
+ UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory + 1024, 2 * 1024);
+
+ LastQuerySortOperator lastQuerySortOperator =
+ new LastQuerySortOperator(
+ Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
+
+ assertEquals(expectedMaxPeekMemory, lastQuerySortOperator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void limitOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+ LimitOperator limitOperator =
+ new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+ assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
+ assertEquals(1024, limitOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void offsetOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+ assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
+ assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void rowBasedTimeJoinOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ expectedMaxPeekMemory += 64 * 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
+ new RowBasedTimeJoinOperator(
+ Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+ assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void sortOperatorTest() {
+ SortOperator sortOperator = new SortOperator();
+ assertEquals(0, sortOperator.calculateMaxPeekMemory());
+ assertEquals(0, sortOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void timeJoinOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ expectedMaxPeekMemory += 64 * 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+ assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void updateLastCacheOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+ UpdateLastCacheOperator updateLastCacheOperator =
+ new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+
+ assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
+ assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+ }
+
+ @Test
+ public void linearFillOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+ LinearFillOperator linearFillOperator =
+ new LinearFillOperator(
+ Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
+
+ assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+ assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b455746b09..291cf0c46d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -92,6 +92,8 @@ public enum TSStatusCode {
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),
+ MEMORY_NOT_ENOUGH(423),
+
INTERNAL_SERVER_ERROR(500),
CLOSE_OPERATION_ERROR(501),
READ_ONLY_SYSTEM_ERROR(502),