You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 12:33:06 UTC

[iotdb] 01/06: merge master

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit acd9559516bf27aa2b9284d2cf2ef9c421c7cb27
Merge: e64b5c5679 bb9d2ff8c7
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Fri Aug 12 09:05:08 2022 +0800

    merge master

 .../resources/conf/iotdb-confignode.properties     |   4 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  22 +-
 .../iotdb/confignode/manager/PartitionManager.java |  33 ++-
 .../iotdb/confignode/manager/load/LoadManager.java |  28 +-
 .../manager/load/balancer/RouteBalancer.java       |   4 +
 .../partition/GreedyPartitionAllocator.java        |  25 +-
 .../load/balancer/router/LazyGreedyRouter.java     |   4 +
 .../persistence/partition/PartitionInfo.java       |  47 ++--
 .../partition/StorageGroupPartitionTable.java      |  40 ++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   3 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 284 +------------------
 .../Maintenance-Tools/Maintenance-Command.md       | 192 ++++++-------
 .../Maintenance-Tools/Maintenance-Command.md       | 191 +++++++------
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   7 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +
 .../db/it/IoTDBClusterPartitionTableTest.java      | 308 +++++++++++++++++++++
 .../integration/IoTDBManageTsFileResourceIT.java   |   7 +-
 .../iotdb/commons/cluster/RegionRoleType.java      |  37 +++
 .../commons/partition/DataPartitionTable.java      |  21 ++
 .../commons/partition/SeriesPartitionTable.java    |  22 ++
 .../resources/conf/iotdb-datanode.properties       |  14 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  40 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  39 ++-
 .../iotdb/db/mpp/common/header/HeaderConstant.java |   4 +-
 .../operator/process/DeviceMergeOperator.java      |  34 +++
 .../operator/process/DeviceViewOperator.java       |  28 ++
 .../operator/process/FilterAndProjectOperator.java | 108 ++++++++
 .../execution/config/metadata/ShowRegionTask.java  |   1 +
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   6 +
 .../column/multi/MappableUDFColumnTransformer.java |   4 +
 .../column/ternary/TernaryColumnTransformer.java   |  12 +
 .../dag/column/unary/UnaryColumnTransformer.java   |   4 +
 .../iotdb/db/rescon/TsFileResourceManager.java     |   3 +-
 .../mpp/execution/operator/OperatorMemoryTest.java | 152 ++++++++++
 .../iotdb/db/rescon/ResourceManagerTest.java       |  10 +-
 .../src/main/thrift/confignode.thrift              |   1 +
 37 files changed, 1177 insertions(+), 572 deletions(-)

diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 67be7fd353,c718942108..8f2d891c95
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@@ -46,12 -48,15 +50,18 @@@ import org.apache.iotdb.db.mpp.executio
  import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
  import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
  import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
  import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
  import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+ import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 +import org.apache.iotdb.db.query.aggregation.AggregationType;
  import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@@ -473,63 -482,141 +487,201 @@@ public class OperatorMemoryTest 
      assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
    }
  
+   @Test
+   public void deviceMergeOperatorTest() {
+     List<Operator> children = new ArrayList<>(4);
+     List<TSDataType> dataTypeList = new ArrayList<>(2);
+     dataTypeList.add(TSDataType.INT32);
+     dataTypeList.add(TSDataType.INT32);
+     List<String> devices = new ArrayList<>(4);
+     devices.add("device1");
+     devices.add("device2");
+     devices.add("device3");
+     devices.add("device4");
+     long expectedMaxReturnSize =
+         3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+     long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+     long expectedRetainedSizeAfterCallingNext = 0;
+     long childrenMaxPeekMemory = 0;
+ 
+     for (int i = 0; i < 4; i++) {
+       Operator child = Mockito.mock(Operator.class);
+       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+       Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+       Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+       expectedMaxPeekMemory += 128 * 1024L;
+       childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+       expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+       children.add(child);
+     }
+ 
+     expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+ 
+     DeviceMergeOperator deviceMergeOperator =
+         new DeviceMergeOperator(
+             Mockito.mock(OperatorContext.class),
+             devices,
+             children,
+             dataTypeList,
+             Mockito.mock(TimeSelector.class),
+             Mockito.mock(TimeComparator.class));
+ 
+     assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+     assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+     assertEquals(
+         expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+         deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+   }
+ 
+   @Test
+   public void deviceViewOperatorTest() {
+     List<Operator> children = new ArrayList<>(4);
+     List<TSDataType> dataTypeList = new ArrayList<>(2);
+     dataTypeList.add(TSDataType.INT32);
+     dataTypeList.add(TSDataType.INT32);
+     List<String> devices = new ArrayList<>(4);
+     devices.add("device1");
+     devices.add("device2");
+     devices.add("device3");
+     devices.add("device4");
+     long expectedMaxReturnSize =
+         2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+     long expectedMaxPeekMemory = expectedMaxReturnSize;
+     long expectedRetainedSizeAfterCallingNext = 0;
+     long childrenMaxPeekMemory = 0;
+ 
+     for (int i = 0; i < 4; i++) {
+       Operator child = Mockito.mock(Operator.class);
+       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+       Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+       Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+       expectedMaxPeekMemory += 1024L;
+       childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+       expectedRetainedSizeAfterCallingNext += 1024L;
+       children.add(child);
+     }
+ 
+     expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+ 
+     DeviceViewOperator deviceViewOperator =
+         new DeviceViewOperator(
+             Mockito.mock(OperatorContext.class),
+             devices,
+             children,
+             new ArrayList<>(),
+             dataTypeList);
+ 
+     assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+     assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+     assertEquals(
+         expectedRetainedSizeAfterCallingNext,
+         deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+   }
+ 
+   @Test
+   public void filterAndProjectOperatorTest() {
+     Operator child = Mockito.mock(Operator.class);
+     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+     Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+     BooleanType booleanType = Mockito.mock(BooleanType.class);
+     Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+     LongType longType = Mockito.mock(LongType.class);
+     Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+     ColumnTransformer filterColumnTransformer =
+         new CompareLessEqualColumnTransformer(
+             booleanType,
+             new TimeColumnTransformer(longType),
+             new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+     List<TSDataType> filterOutputTypes = new ArrayList<>();
+     filterOutputTypes.add(TSDataType.INT32);
+     filterOutputTypes.add(TSDataType.INT64);
+     List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+     projectColumnTransformers.add(
+         new ArithmeticAdditionColumnTransformer(
+             booleanType,
+             new TimeColumnTransformer(longType),
+             new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+ 
+     FilterAndProjectOperator operator =
+         new FilterAndProjectOperator(
+             Mockito.mock(OperatorContext.class),
+             child,
+             filterOutputTypes,
+             new ArrayList<>(),
+             filterColumnTransformer,
+             new ArrayList<>(),
+             new ArrayList<>(),
+             projectColumnTransformers,
+             false,
+             true);
+ 
+     assertEquals(
+         4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+         operator.calculateMaxPeekMemory());
+     assertEquals(
+         2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+         operator.calculateMaxReturnSize());
+     assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+   }
++
 +  @Test
 +  public void seriesAggregationScanOperatorTest() {
 +    ExecutorService instanceNotificationExecutor =
 +        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
 +    try {
 +      MeasurementPath measurementPath =
 +          new MeasurementPath(
 +              "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
 +      Set<String> allSensors = Sets.newHashSet("sensor0");
 +
 +      QueryId queryId = new QueryId("stub_query");
 +      FragmentInstanceId instanceId =
 +          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
 +      FragmentInstanceStateMachine stateMachine =
 +          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 +      FragmentInstanceContext fragmentInstanceContext =
 +          createFragmentInstanceContext(instanceId, stateMachine);
 +      PlanNodeId planNodeId = new PlanNodeId("1");
 +      fragmentInstanceContext.addOperatorContext(
 +          1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
 +
 +      SeriesAggregationScanOperator seriesAggregationScanOperator =
 +          new SeriesAggregationScanOperator(
 +              planNodeId,
 +              measurementPath,
 +              allSensors,
 +              fragmentInstanceContext.getOperatorContexts().get(0),
 +              Arrays.asList(
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.COUNT, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MAX_VALUE, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MIN_TIME, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE)),
 +              null,
 +              true,
 +              null);
 +
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxPeekMemory());
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxReturnSize());
 +    } catch (IllegalPathException e) {
 +      e.printStackTrace();
 +      fail();
 +    } finally {
 +      instanceNotificationExecutor.shutdown();
 +    }
 +  }
  }