You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 12:33:06 UTC
[iotdb] 01/06: merge master
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit acd9559516bf27aa2b9284d2cf2ef9c421c7cb27
Merge: e64b5c5679 bb9d2ff8c7
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Fri Aug 12 09:05:08 2022 +0800
merge master
.../resources/conf/iotdb-confignode.properties | 4 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 22 +-
.../iotdb/confignode/manager/PartitionManager.java | 33 ++-
.../iotdb/confignode/manager/load/LoadManager.java | 28 +-
.../manager/load/balancer/RouteBalancer.java | 4 +
.../partition/GreedyPartitionAllocator.java | 25 +-
.../load/balancer/router/LazyGreedyRouter.java | 4 +
.../persistence/partition/PartitionInfo.java | 47 ++--
.../partition/StorageGroupPartitionTable.java | 40 ++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 3 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 284 +------------------
.../Maintenance-Tools/Maintenance-Command.md | 192 ++++++-------
.../Maintenance-Tools/Maintenance-Command.md | 191 +++++++------
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../db/it/IoTDBClusterPartitionTableTest.java | 308 +++++++++++++++++++++
.../integration/IoTDBManageTsFileResourceIT.java | 7 +-
.../iotdb/commons/cluster/RegionRoleType.java | 37 +++
.../commons/partition/DataPartitionTable.java | 21 ++
.../commons/partition/SeriesPartitionTable.java | 22 ++
.../resources/conf/iotdb-datanode.properties | 14 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 39 ++-
.../iotdb/db/mpp/common/header/HeaderConstant.java | 4 +-
.../operator/process/DeviceMergeOperator.java | 34 +++
.../operator/process/DeviceViewOperator.java | 28 ++
.../operator/process/FilterAndProjectOperator.java | 108 ++++++++
.../execution/config/metadata/ShowRegionTask.java | 1 +
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 6 +
.../column/multi/MappableUDFColumnTransformer.java | 4 +
.../column/ternary/TernaryColumnTransformer.java | 12 +
.../dag/column/unary/UnaryColumnTransformer.java | 4 +
.../iotdb/db/rescon/TsFileResourceManager.java | 3 +-
.../mpp/execution/operator/OperatorMemoryTest.java | 152 ++++++++++
.../iotdb/db/rescon/ResourceManagerTest.java | 10 +-
.../src/main/thrift/confignode.thrift | 1 +
37 files changed, 1177 insertions(+), 572 deletions(-)
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 67be7fd353,c718942108..8f2d891c95
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@@ -46,12 -48,15 +50,18 @@@ import org.apache.iotdb.db.mpp.executio
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@@ -473,63 -482,141 +487,201 @@@ public class OperatorMemoryTest
assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
}
+ @Test
+ public void deviceMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+ expectedMaxPeekMemory += 128 * 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ dataTypeList,
+ Mockito.mock(TimeSelector.class),
+ Mockito.mock(TimeComparator.class));
+
+ assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+ deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void deviceViewOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+ expectedMaxPeekMemory += 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceViewOperator deviceViewOperator =
+ new DeviceViewOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ new ArrayList<>(),
+ dataTypeList);
+
+ assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext,
+ deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void filterAndProjectOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ BooleanType booleanType = Mockito.mock(BooleanType.class);
+ Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+ LongType longType = Mockito.mock(LongType.class);
+ Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+ ColumnTransformer filterColumnTransformer =
+ new CompareLessEqualColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+ List<TSDataType> filterOutputTypes = new ArrayList<>();
+ filterOutputTypes.add(TSDataType.INT32);
+ filterOutputTypes.add(TSDataType.INT64);
+ List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+ projectColumnTransformers.add(
+ new ArithmeticAdditionColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+
+ FilterAndProjectOperator operator =
+ new FilterAndProjectOperator(
+ Mockito.mock(OperatorContext.class),
+ child,
+ filterOutputTypes,
+ new ArrayList<>(),
+ filterColumnTransformer,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ projectColumnTransformers,
+ false,
+ true);
+
+ assertEquals(
+ 4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+ operator.calculateMaxPeekMemory());
+ assertEquals(
+ 2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ operator.calculateMaxReturnSize());
+ assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+ }
++
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ Arrays.asList(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.COUNT, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MAX_VALUE, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MIN_TIME, TSDataType.INT32, true),
+ AggregationStep.SINGLE)),
+ null,
+ true,
+ null);
+
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxReturnSize());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}