You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/29 01:31:21 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 6b6c5d72c5  [To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)
6b6c5d72c5 is described below

commit 6b6c5d72c58003412cb9753082c3e9a62c8feef6
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Nov 29 09:31:17 2022 +0800

     [To rel/1.0] Add logs for AdjustRegionGroupNum process (#8233)
---
 .../confignode/manager/ClusterSchemaManager.java   |  11 ++
 .../iotdb/confignode/manager/load/LoadManager.java |   2 +-
 .../manager/load/balancer/RouteBalancer.java       |   2 +-
 .../load/balancer/router/RegionRouteMap.java       |   5 +
 .../impl/schema/DeleteStorageGroupProcedure.java   |   5 +
 .../partition/IoTDBAutoRegionGroupExtensionIT.java | 200 +++++++++++++++++++++
 6 files changed, 223 insertions(+), 2 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 0a3b23f6af..85e1b8475e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -331,6 +331,9 @@ public class ClusterSchemaManager {
       // The leastDataRegionGroupNum should be the maximum integer that satisfy:
       // 1 <= leastDataRegionGroupNum <= 5(default)
       CONF.setLeastDataRegionGroupNum(leastDataRegionGroupNum);
+      LOGGER.info(
+          "[AdjustRegionGroupNum] The least number of DataRegionGroups per Database is adjusted to: {}",
+          leastDataRegionGroupNum);
     }
 
     AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
@@ -363,6 +366,10 @@ public class ClusterSchemaManager {
                                     (storageGroupNum
                                         * storageGroupSchema.getSchemaReplicationFactor())),
                     allocatedSchemaRegionGroupCount));
+        LOGGER.info(
+            "[AdjustRegionGroupNum] The maximum number of SchemaRegionGroups for Database: {} is adjusted to: {}",
+            storageGroupSchema.getName(),
+            maxSchemaRegionGroupNum);
 
         // Adjust maxDataRegionGroupNum for each StorageGroup.
         // All StorageGroups divide the total cpu cores equally.
@@ -390,6 +397,10 @@ public class ClusterSchemaManager {
                                     (storageGroupNum
                                         * storageGroupSchema.getDataReplicationFactor())),
                     allocatedDataRegionGroupCount));
+        LOGGER.info(
+            "[AdjustRegionGroupNum] The maximum number of DataRegionGroups for Database: {} is adjusted to: {}",
+            storageGroupSchema.getName(),
+            maxDataRegionGroupNum);
 
         adjustMaxRegionGroupNumPlan.putEntry(
             storageGroupSchema.getName(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 328e7f9021..59f28aa529 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -222,7 +222,7 @@ public class LoadManager {
     // Update RegionRouteMap
     if (routeBalancer.updateRegionRouteMap()) {
       isNeedBroadcast = true;
-      recordRegionRouteMap(routeBalancer.getLatestRegionRouteMap());
+      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
     }
 
     if (isNeedBroadcast) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index c1d84de2ce..31f28397fb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -401,7 +401,7 @@ public class RouteBalancer {
     return regionRouteMap.getRegionPriorityMap();
   }
 
-  public RegionRouteMap getLatestRegionRouteMap() {
+  public RegionRouteMap getRegionRouteMap() {
     return regionRouteMap;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index f2d1971d30..58f456ab8f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -82,6 +82,11 @@ public class RegionRouteMap {
     this.regionPriorityMap = regionPriorityMap;
   }
 
+  public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
+    this.regionLeaderMap.remove(regionGroupId);
+    this.regionPriorityMap.remove(regionGroupId);
+  }
+
   public void serialize(OutputStream stream, TProtocol protocol) throws IOException {
     try {
       ReadWriteIOUtils.write(regionLeaderMap.size(), stream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
index a9b507dc71..fddf085734 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteStorageGroupProcedure.java
@@ -115,6 +115,11 @@ public class DeleteStorageGroupProcedure
                 env.getConfigManager()
                     .getPartitionManager()
                     .removeRegionGroupCache(regionReplicaSet.getRegionId());
+                env.getConfigManager()
+                    .getLoadManager()
+                    .getRouteBalancer()
+                    .getRegionRouteMap()
+                    .removeRegionRouteCache(regionReplicaSet.getRegionId());
               });
           env.getConfigManager().getConsensusManager().write(offerPlan);
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
new file mode 100644
index 0000000000..436fa94611
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBAutoRegionGroupExtensionIT {
+
+  private static final BaseConfig CONF = ConfigFactory.getConfig();
+
+  private static String originalDataRegionGroupExtensionPolicy;
+  private static final String testDataRegionGroupExtensionPolicy = "AUTO";
+
+  private static String originalSchemaRegionConsensusProtocolClass;
+  private static String originalDataRegionConsensusProtocolClass;
+  private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+  private static int originalSchemaReplicationFactor;
+  private static int originalDataReplicationFactor;
+  private static final int testReplicationFactor = 1;
+
+  private static long originalTimePartitionInterval;
+
+  private static int originalLeastDataRegionGroupNum;
+
+  private static final String sg = "root.sg";
+  private static final int testSgNum = 2;
+
+  @Before
+  public void setUp() throws Exception {
+    originalSchemaRegionConsensusProtocolClass = CONF.getSchemaRegionConsensusProtocolClass();
+    originalDataRegionConsensusProtocolClass = CONF.getDataRegionConsensusProtocolClass();
+    CONF.setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+    CONF.setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+    originalSchemaReplicationFactor = CONF.getSchemaReplicationFactor();
+    originalDataReplicationFactor = CONF.getDataReplicationFactor();
+    CONF.setSchemaReplicationFactor(testReplicationFactor);
+    CONF.setDataReplicationFactor(testReplicationFactor);
+
+    originalTimePartitionInterval = CONF.getTimePartitionInterval();
+
+    originalLeastDataRegionGroupNum = CONF.getLeastDataRegionGroupNum();
+
+    originalDataRegionGroupExtensionPolicy = CONF.getDataRegionGroupExtensionPolicy();
+    CONF.setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy);
+
+    // Init 1C3D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+
+    CONF.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+    CONF.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+    CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
+    CONF.setDataReplicationFactor(originalDataReplicationFactor);
+    CONF.setDataRegionGroupExtensionPolicy(originalDataRegionGroupExtensionPolicy);
+  }
+
+  @Test
+  public void testAutoRegionGroupExtensionPolicy()
+      throws IOException, InterruptedException, TException {
+
+    final int retryNum = 100;
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      setStorageGroupAndCheckRegionGroupDistribution(client);
+
+      // Delete all StorageGroups
+      for (int i = 0; i < testSgNum; i++) {
+        String curSg = sg + i;
+        client.deleteStorageGroup(new TDeleteStorageGroupReq(curSg));
+      }
+      boolean isAllRegionGroupDeleted = false;
+      for (int retry = 0; retry < 100; retry++) {
+        TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+        if (showRegionResp.getRegionInfoListSize() == 0) {
+          isAllRegionGroupDeleted = true;
+          break;
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+      }
+      Assert.assertTrue(isAllRegionGroupDeleted);
+
+      // Re-test for safety
+      setStorageGroupAndCheckRegionGroupDistribution(client);
+    }
+  }
+
+  private void setStorageGroupAndCheckRegionGroupDistribution(SyncConfigNodeIServiceClient client)
+      throws TException {
+    for (int i = 0; i < testSgNum; i++) {
+      String curSg = sg + i;
+      TSStatus status =
+          client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(curSg)));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+    }
+
+    for (int i = 0; i < testSgNum; i++) {
+      String curSg = sg + i;
+
+      /* Insert a DataPartition to create DataRegionGroups */
+      Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+          ConfigNodeTestUtils.constructPartitionSlotsMap(
+              curSg, 0, 10, 0, 10, originalTimePartitionInterval);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+    }
+
+    // Re-calculate the least DataRegionGroup num based on the test resource
+    int totalCpuCoreNum = 0;
+    TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+    for (TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
+      totalCpuCoreNum += dataNodeInfo.getCpuCoreNum();
+    }
+    final int leastDataRegionGroupNum =
+        Math.min(
+            originalLeastDataRegionGroupNum,
+            (int)
+                Math.ceil((double) totalCpuCoreNum / (double) (testSgNum * testReplicationFactor)));
+
+    /* Check the number of DataRegionGroups */
+    TShowRegionResp showRegionReq = client.showRegion(new TShowRegionReq());
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionReq.getStatus().getCode());
+    Map<String, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+    showRegionReq
+        .getRegionInfoList()
+        .forEach(
+            regionInfo ->
+                regionCounter
+                    .computeIfAbsent(regionInfo.getStorageGroup(), empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    Assert.assertEquals(testSgNum, regionCounter.size());
+    regionCounter.forEach(
+        (sg, regionCount) -> Assert.assertEquals(leastDataRegionGroupNum, regionCount.get()));
+  }
+}