You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/29 01:31:21 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 6b6c5d72c5 [To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)
6b6c5d72c5 is described below
commit 6b6c5d72c58003412cb9753082c3e9a62c8feef6
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Nov 29 09:31:17 2022 +0800
[To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)
---
.../confignode/manager/ClusterSchemaManager.java | 11 ++
.../iotdb/confignode/manager/load/LoadManager.java | 2 +-
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../load/balancer/router/RegionRouteMap.java | 5 +
.../impl/schema/DeleteStorageGroupProcedure.java | 5 +
.../partition/IoTDBAutoRegionGroupExtensionIT.java | 200 +++++++++++++++++++++
6 files changed, 223 insertions(+), 2 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 0a3b23f6af..85e1b8475e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -331,6 +331,9 @@ public class ClusterSchemaManager {
// The leastDataRegionGroupNum should be the maximum integer that satisfy:
// 1 <= leastDataRegionGroupNum <= 5(default)
CONF.setLeastDataRegionGroupNum(leastDataRegionGroupNum);
+ LOGGER.info(
+ "[AdjustRegionGroupNum] The least number of DataRegionGroups per Database is adjusted to: {}",
+ leastDataRegionGroupNum);
}
AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
@@ -363,6 +366,10 @@ public class ClusterSchemaManager {
(storageGroupNum
* storageGroupSchema.getSchemaReplicationFactor())),
allocatedSchemaRegionGroupCount));
+ LOGGER.info(
+ "[AdjustRegionGroupNum] The maximum number of SchemaRegionGroups for Database: {} is adjusted to: {}",
+ storageGroupSchema.getName(),
+ maxSchemaRegionGroupNum);
// Adjust maxDataRegionGroupNum for each StorageGroup.
// All StorageGroups divide the total cpu cores equally.
@@ -390,6 +397,10 @@ public class ClusterSchemaManager {
(storageGroupNum
* storageGroupSchema.getDataReplicationFactor())),
allocatedDataRegionGroupCount));
+ LOGGER.info(
+ "[AdjustRegionGroupNum] The maximum number of DataRegionGroups for Database: {} is adjusted to: {}",
+ storageGroupSchema.getName(),
+ maxDataRegionGroupNum);
adjustMaxRegionGroupNumPlan.putEntry(
storageGroupSchema.getName(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 328e7f9021..59f28aa529 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -222,7 +222,7 @@ public class LoadManager {
// Update RegionRouteMap
if (routeBalancer.updateRegionRouteMap()) {
isNeedBroadcast = true;
- recordRegionRouteMap(routeBalancer.getLatestRegionRouteMap());
+ recordRegionRouteMap(routeBalancer.getRegionRouteMap());
}
if (isNeedBroadcast) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index c1d84de2ce..31f28397fb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -401,7 +401,7 @@ public class RouteBalancer {
return regionRouteMap.getRegionPriorityMap();
}
- public RegionRouteMap getLatestRegionRouteMap() {
+ public RegionRouteMap getRegionRouteMap() {
return regionRouteMap;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index f2d1971d30..58f456ab8f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -82,6 +82,11 @@ public class RegionRouteMap {
this.regionPriorityMap = regionPriorityMap;
}
+ public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
+ this.regionLeaderMap.remove(regionGroupId);
+ this.regionPriorityMap.remove(regionGroupId);
+ }
+
public void serialize(OutputStream stream, TProtocol protocol) throws IOException {
try {
ReadWriteIOUtils.write(regionLeaderMap.size(), stream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
index a9b507dc71..fddf085734 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
@@ -115,6 +115,11 @@ public class DeleteStorageGroupProcedure
env.getConfigManager()
.getPartitionManager()
.removeRegionGroupCache(regionReplicaSet.getRegionId());
+ env.getConfigManager()
+ .getLoadManager()
+ .getRouteBalancer()
+ .getRegionRouteMap()
+ .removeRegionRouteCache(regionReplicaSet.getRegionId());
});
env.getConfigManager().getConsensusManager().write(offerPlan);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
new file mode 100644
index 0000000000..436fa94611
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBAutoRegionGroupExtensionIT {
+
+ private static final BaseConfig CONF = ConfigFactory.getConfig();
+
+ private static String originalDataRegionGroupExtensionPolicy;
+ private static final String testDataRegionGroupExtensionPolicy = "AUTO";
+
+ private static String originalSchemaRegionConsensusProtocolClass;
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+ private static int originalSchemaReplicationFactor;
+ private static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 1;
+
+ private static long originalTimePartitionInterval;
+
+ private static int originalLeastDataRegionGroupNum;
+
+ private static final String sg = "root.sg";
+ private static final int testSgNum = 2;
+
+ @Before
+ public void setUp() throws Exception {
+ originalSchemaRegionConsensusProtocolClass = CONF.getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass = CONF.getDataRegionConsensusProtocolClass();
+ CONF.setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+ originalSchemaReplicationFactor = CONF.getSchemaReplicationFactor();
+ originalDataReplicationFactor = CONF.getDataReplicationFactor();
+ CONF.setSchemaReplicationFactor(testReplicationFactor);
+ CONF.setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = CONF.getTimePartitionInterval();
+
+ originalLeastDataRegionGroupNum = CONF.getLeastDataRegionGroupNum();
+
+ originalDataRegionGroupExtensionPolicy = CONF.getDataRegionGroupExtensionPolicy();
+ CONF.setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy);
+
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ CONF.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
+ CONF.setDataReplicationFactor(originalDataReplicationFactor);
+ CONF.setDataRegionGroupExtensionPolicy(originalDataRegionGroupExtensionPolicy);
+ }
+
+ @Test
+ public void testAutoRegionGroupExtensionPolicy()
+ throws IOException, InterruptedException, TException {
+
+ final int retryNum = 100;
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ setStorageGroupAndCheckRegionGroupDistribution(client);
+
+ // Delete all StorageGroups
+ for (int i = 0; i < testSgNum; i++) {
+ String curSg = sg + i;
+ client.deleteStorageGroup(new TDeleteStorageGroupReq(curSg));
+ }
+ boolean isAllRegionGroupDeleted = false;
+ for (int retry = 0; retry < 100; retry++) {
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ if (showRegionResp.getRegionInfoListSize() == 0) {
+ isAllRegionGroupDeleted = true;
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(isAllRegionGroupDeleted);
+
+ // Re-test for safety
+ setStorageGroupAndCheckRegionGroupDistribution(client);
+ }
+ }
+
+ private void setStorageGroupAndCheckRegionGroupDistribution(SyncConfigNodeIServiceClient client)
+ throws TException {
+ for (int i = 0; i < testSgNum; i++) {
+ String curSg = sg + i;
+ TSStatus status =
+ client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(curSg)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ for (int i = 0; i < testSgNum; i++) {
+ String curSg = sg + i;
+
+ /* Insert a DataPartition to create DataRegionGroups */
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ curSg, 0, 10, 0, 10, originalTimePartitionInterval);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ }
+
+ // Re-calculate the least DataRegionGroup num based on the test resource
+ int totalCpuCoreNum = 0;
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ for (TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
+ totalCpuCoreNum += dataNodeInfo.getCpuCoreNum();
+ }
+ final int leastDataRegionGroupNum =
+ Math.min(
+ originalLeastDataRegionGroupNum,
+ (int)
+ Math.ceil((double) totalCpuCoreNum / (double) (testSgNum * testReplicationFactor)));
+
+ /* Check the number of DataRegionGroups */
+ TShowRegionResp showRegionReq = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionReq.getStatus().getCode());
+ Map<String, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ showRegionReq
+ .getRegionInfoList()
+ .forEach(
+ regionInfo ->
+ regionCounter
+ .computeIfAbsent(regionInfo.getStorageGroup(), empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ Assert.assertEquals(testSgNum, regionCounter.size());
+ regionCounter.forEach(
+ (sg, regionCount) -> Assert.assertEquals(leastDataRegionGroupNum, regionCount.get()));
+ }
+}