You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/17 14:13:56 UTC
[iotdb] branch master updated: [IOTDB-4955] Make DataPartition inherit policy configurable (#8017)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cd35746b28 [IOTDB-4955] Make DataPartition inherit policy configurable (#8017)
cd35746b28 is described below
commit cd35746b282a96addd49bc0decd3f516e6df3755
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Nov 17 22:13:50 2022 +0800
[IOTDB-4955] Make DataPartition inherit policy configurable (#8017)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 14 +
.../confignode/conf/ConfigNodeDescriptor.java | 6 +
.../NotAvailableRegionGroupException.java | 13 +-
.../partition/GreedyPartitionAllocator.java | 70 +--
.../manager/partition/PartitionManager.java | 6 +-
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../it/partition/IoTDBPartitionDurableTest.java | 323 ++++++++++++
.../IoTDBPartitionGetterIT.java} | 582 ++++++---------------
.../partition/IoTDBPartitionInheritPolicyTest.java | 182 +++++++
.../confignode/it/utils/ConfigNodeTestUtils.java | 66 +++
.../resources/conf/iotdb-common.properties | 18 +-
12 files changed, 817 insertions(+), 478 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 65b989d307..52fa0b0462 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -68,6 +68,12 @@ public class ConfigNodeConfig {
private RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
RegionBalancer.RegionAllocateStrategy.GREEDY;
+ /**
+ * DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the
+ * previous TimePartitionSlot if set true
+ */
+ private boolean enableDataPartitionInheritPolicy = false;
+
/** Number of SeriesPartitionSlots per StorageGroup */
private int seriesPartitionSlotNum = 10000;
@@ -420,6 +426,14 @@ public class ConfigNodeConfig {
this.regionAllocateStrategy = regionAllocateStrategy;
}
+ public boolean isEnableDataPartitionInheritPolicy() {
+ return enableDataPartitionInheritPolicy;
+ }
+
+ public void setEnableDataPartitionInheritPolicy(boolean enableDataPartitionInheritPolicy) {
+ this.enableDataPartitionInheritPolicy = enableDataPartitionInheritPolicy;
+ }
+
public int getThriftServerAwaitTimeForStopService() {
return thriftServerAwaitTimeForStopService;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 976f0e3710..e5728c13bd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -222,6 +222,12 @@ public class ConfigNodeDescriptor {
"The configured region allocate strategy does not exist, use the default: GREEDY!");
}
+ conf.setEnableDataPartitionInheritPolicy(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_data_partition_inherit_policy",
+ String.valueOf(conf.isEnableDataPartitionInheritPolicy()))));
+
conf.setCnRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
properties
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
index a9687a804f..c7b44f36d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
@@ -18,10 +18,19 @@
*/
package org.apache.iotdb.confignode.exception;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+
public class NotAvailableRegionGroupException extends ConfigNodeException {
- public NotAvailableRegionGroupException() {
+ private static final String SCHEMA_REGION_GROUP = "SchemaRegionGroup";
+ private static final String DATA_REGION_GROUP = "DataRegionGroup";
+
+ public NotAvailableRegionGroupException(TConsensusGroupType regionGroupType) {
super(
- "There are no available RegionGroups currently, please check the status of cluster DataNodes");
+ String.format(
+ "There are no available %s RegionGroups currently, please use \"show cluster\" or \"show regions\" to check the cluster status",
+ TConsensusGroupType.SchemaRegion.equals(regionGroupType)
+ ? SCHEMA_REGION_GROUP
+ : DATA_REGION_GROUP));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 73b5263318..0e9290ec87 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
@@ -40,8 +41,10 @@ import java.util.concurrent.ConcurrentHashMap;
/** Allocating new Partitions by greedy algorithm */
public class GreedyPartitionAllocator implements IPartitionAllocator {
- private static final long TIME_PARTITION_INTERVAL =
- ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval();
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final boolean ENABLE_DATA_PARTITION_INHERIT_POLICY =
+ CONF.isEnableDataPartitionInheritPolicy();
+ private static final long TIME_PARTITION_INTERVAL = CONF.getTimePartitionInterval();
private final IManager configManager;
@@ -109,39 +112,40 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
- /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */
-
- // Check if the current Partition's predecessor is allocated
- // in the same batch of Partition creation
- TConsensusGroupId predecessor =
- seriesPartitionTable.getPrecededDataPartition(
- timePartitionSlot, TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
- continue;
+ /* 1. Inherit policy */
+ if (ENABLE_DATA_PARTITION_INHERIT_POLICY) {
+ // Check if the current Partition's predecessor is allocated
+ // in the same batch of Partition creation
+ TConsensusGroupId predecessor =
+ seriesPartitionTable.getPrecededDataPartition(
+ timePartitionSlot, TIME_PARTITION_INTERVAL);
+ if (predecessor != null) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot, Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
+ }
+
+ // Check if the current Partition's predecessor was allocated
+ // in the former Partition creation
+ predecessor =
+ getPartitionManager()
+ .getPrecededDataPartition(
+ storageGroup,
+ seriesPartitionEntry.getKey(),
+ timePartitionSlot,
+ TIME_PARTITION_INTERVAL);
+ if (predecessor != null) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot, Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
+ }
}
- // Check if the current Partition's predecessor was allocated
- // in the former Partition creation
- predecessor =
- getPartitionManager()
- .getPrecededDataPartition(
- storageGroup,
- seriesPartitionEntry.getKey(),
- timePartitionSlot,
- TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
- continue;
- }
-
- /* Greedy allocation */
+ /* 2. Greedy policy */
seriesPartitionTable
.getSeriesPartitionMap()
.put(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 817091fbfe..3be1411ff7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -218,7 +218,7 @@ public class PartitionManager {
assignedSchemaPartition =
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
} catch (NotAvailableRegionGroupException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error("Create SchemaPartition failed because: ", e);
resp.setStatus(
new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
.setMessage(e.getMessage()));
@@ -295,7 +295,7 @@ public class PartitionManager {
assignedDataPartition =
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
} catch (NotAvailableRegionGroupException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error("Create DataPartition failed because: ", e);
resp.setStatus(
new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
.setMessage(e.getMessage()));
@@ -521,7 +521,7 @@ public class PartitionManager {
}
if (result.isEmpty()) {
- throw new NotAvailableRegionGroupException();
+ throw new NotAvailableRegionGroupException(type);
}
result.sort(Comparator.comparingLong(Pair::getLeft));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index bff891fef8..f7fa4fe4fe 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -228,6 +228,13 @@ public class MppConfig implements BaseConfig {
return this;
}
+ @Override
+ public BaseConfig setEnableDataPartitionInheritPolicy(boolean enableDataPartitionInheritPolicy) {
+ confignodeProperties.setProperty(
+ "enable_data_partition_inherit_policy", String.valueOf(enableDataPartitionInheritPolicy));
+ return this;
+ }
+
@Override
public BaseConfig setDataReplicationFactor(int dataReplicationFactor) {
confignodeProperties.setProperty(
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 2f6fbcea9f..8f3dffdd1b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -223,6 +223,14 @@ public interface BaseConfig {
return "org.apache.iotdb.consensus.simple.SimpleConsensus";
}
+ default BaseConfig setEnableDataPartitionInheritPolicy(boolean enableDataPartitionInheritPolicy) {
+ return this;
+ }
+
+ default boolean isEnableDataPartitionInheritPolicy() {
+ return false;
+ }
+
default BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
return this;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java
new file mode 100644
index 0000000000..2219e5b267
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPartitionDurableTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionDurableTest.class);
+
+ private static String originalConfigNodeConsensusProtocolClass;
+ private static String originalSchemaRegionConsensusProtocolClass;
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+ private static int originalSchemaReplicationFactor;
+ private static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 3;
+
+ private static long originalTimePartitionInterval;
+ private static final long testTimePartitionInterval = 604800000;
+
+ private static final String sg = "root.sg";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ originalSchemaRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+ originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
+ originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+ ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+
+ ConfigFactory.getConfig().setSchemaReplicationFactor(originalSchemaReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
+
+ ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ }
+
+ // TODO: Optimize this in IOTDB-4334
+ @Test
+ public void testPartitionDurable() throws IOException, TException, InterruptedException {
+ final int testDataNodeId = 0;
+ final int seriesPartitionBatchSize = 10;
+ final int timePartitionBatchSize = 10;
+
+ // Shutdown the first DataNode
+ EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ final String sg0 = sg + 0;
+ final String sg1 = sg + 1;
+
+ // Set StorageGroup, the result should be success
+ TSetStorageGroupReq setStorageGroupReq =
+ new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ TSStatus status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+ status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg0,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ testTimePartitionInterval);
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg0,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count
+ int runningCnt = 0;
+ int unknownCnt = 0;
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ // Wait for shutdown check
+ TShowClusterResp showClusterResp;
+ while (true) {
+ boolean containUnknown = false;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ containUnknown = true;
+ break;
+ }
+ }
+ if (containUnknown) {
+ break;
+ }
+ }
+ runningCnt = 0;
+ unknownCnt = 0;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Running.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ runningCnt += 1;
+ } else if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ unknownCnt += 1;
+ }
+ }
+ Assert.assertEquals(2, runningCnt);
+ Assert.assertEquals(1, unknownCnt);
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg1,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ testTimePartitionInterval);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg1,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count and status
+ runningCnt = 0;
+ unknownCnt = 0;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ EnvFactory.getEnv().startDataNode(testDataNodeId);
+ // Wait for heartbeat check
+ while (true) {
+ boolean containUnknown = false;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ containUnknown = true;
+ break;
+ }
+ }
+ if (!containUnknown) {
+ break;
+ }
+ }
+
+ // All Regions should alive after the testDataNode is restarted
+ boolean allRunning = true;
+ for (int retry = 0; retry < 30; retry++) {
+ allRunning = true;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ allRunning = false;
+ break;
+ }
+ }
+ if (allRunning) {
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(allRunning);
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
similarity index 53%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index 05985a3e2f..3f15c498f8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -36,15 +34,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
@@ -54,9 +48,9 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
-import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -66,7 +60,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -75,38 +68,40 @@ import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generateP
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBClusterPartitionIT {
+public class IoTDBPartitionGetterIT {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterPartitionIT.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionGetterIT.class);
- protected static String originalConfigNodeConsensusProtocolClass;
- protected static String originalSchemaRegionConsensusProtocolClass;
- protected static String originalDataRegionConsensusProtocolClass;
+ private static String originalConfigNodeConsensusProtocolClass;
+ private static String originalSchemaRegionConsensusProtocolClass;
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
- protected static int originalSchemaReplicationFactor;
- protected static int originalDataReplicationFactor;
+ private static int originalSchemaReplicationFactor;
+ private static int originalDataReplicationFactor;
private static final int testReplicationFactor = 3;
- protected static long originalTimePartitionInterval;
+ private static long originalTimePartitionInterval;
private static final long testTimePartitionInterval = 604800000;
private static final String sg = "root.sg";
private static final int storageGroupNum = 5;
private static final int seriesPartitionSlotsNum = 10000;
+ private static final int seriesPartitionBatchSize = 100;
private static final int timePartitionSlotsNum = 10;
+ private static final int timePartitionBatchSize = 10;
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUp() throws Exception {
originalConfigNodeConsensusProtocolClass =
ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
originalSchemaRegionConsensusProtocolClass =
ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
originalDataRegionConsensusProtocolClass =
ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
- ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
- ConfigFactory.getConfig()
- .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
- ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
@@ -116,11 +111,103 @@ public class IoTDBClusterPartitionIT {
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
- EnvFactory.getEnv().initBeforeClass();
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ prepareData();
}
- @After
- public void tearDown() {
+ private static void prepareData()
+ throws IOException, InterruptedException, TException, IllegalPathException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ /* Set StorageGroups */
+ for (int i = 0; i < storageGroupNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ TSStatus status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ /* Create SchemaPartitions */
+ final String sg = "root.sg";
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ final String d10 = sg1 + ".d0.s";
+ final String d11 = sg1 + ".d1.s";
+
+ TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq();
+ TSchemaPartitionTableResp schemaPartitionTableResp;
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable;
+
+ ByteBuffer buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg + i));
+ Assert.assertEquals(2, schemaPartitionTable.get(sg + i).size());
+ }
+
+ /* Create DataPartitions */
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
+ for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ testTimePartitionInterval);
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
+ EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
+ configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+ }
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
EnvFactory.getEnv().cleanAfterClass();
ConfigFactory.getConfig()
@@ -137,37 +224,28 @@ public class IoTDBClusterPartitionIT {
}
@Test
- public void testGetAndCreateSchemaPartition()
+ public void testGetSchemaPartition()
throws TException, IOException, IllegalPathException, InterruptedException {
final String sg = "root.sg";
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
- final String d00 = sg0 + ".d0.s";
- final String d01 = sg0 + ".d1.s";
- final String d10 = sg1 + ".d0.s";
final String d11 = sg1 + ".d1.s";
final String allPaths = "root.**";
final String allSg0 = "root.sg0.**";
- final String allSg1 = "root.sg1.**";
+
+ final String notExistsSg = "root.sg10.**";
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- TSStatus status;
ByteBuffer buffer;
TSchemaPartitionReq schemaPartitionReq;
TSchemaPartitionTableResp schemaPartitionTableResp;
Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable;
- // Set StorageGroups
- status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg1)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
// Test getSchemaPartition, the result should be empty
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
+ buffer = generatePatternTreeBuffer(new String[] {notExistsSg});
schemaPartitionReq = new TSchemaPartitionReq(buffer);
schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
Assert.assertEquals(
@@ -175,20 +253,6 @@ public class IoTDBClusterPartitionIT {
schemaPartitionTableResp.getStatus().getCode());
Assert.assertEquals(0, schemaPartitionTableResp.getSchemaPartitionTableSize());
- // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
- schemaPartitionReq.setPathPatternTree(buffer);
- schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- schemaPartitionTableResp.getStatus().getCode());
- Assert.assertEquals(2, schemaPartitionTableResp.getSchemaPartitionTableSize());
- schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(schemaPartitionTable.containsKey(sg + i));
- Assert.assertEquals(2, schemaPartitionTable.get(sg + i).size());
- }
-
// Test getSchemaPartition, when a device path doesn't match any StorageGroup and including
// "**", ConfigNode will return all the SchemaPartitions
buffer = generatePatternTreeBuffer(new String[] {allPaths});
@@ -223,86 +287,20 @@ public class IoTDBClusterPartitionIT {
}
}
- private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- constructPartitionSlotsMap(
- String storageGroup,
- int seriesSlotStart,
- int seriesSlotEnd,
- long timeSlotStart,
- long timeSlotEnd) {
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
- result.put(storageGroup, new HashMap<>());
-
- for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
- result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- for (long j = timeSlotStart; j < timeSlotEnd; j++) {
- TTimePartitionSlot timePartitionSlot =
- new TTimePartitionSlot(j * testTimePartitionInterval);
- result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
- }
- }
-
- return result;
- }
-
- private void checkDataPartitionMap(
- String storageGroup,
- int seriesSlotStart,
- int seriesSlotEnd,
- long timeSlotStart,
- long timeSlotEnd,
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
- dataPartitionTable) {
-
- Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
- seriesPartitionTable = dataPartitionTable.get(storageGroup);
- Assert.assertEquals(seriesSlotEnd - seriesSlotStart, seriesPartitionTable.size());
-
- for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
- Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
- Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
- seriesPartitionTable.get(seriesPartitionSlot);
- Assert.assertEquals(timeSlotEnd - timeSlotStart, timePartitionTable.size());
-
- for (long j = timeSlotStart; j < timeSlotEnd; j++) {
- TTimePartitionSlot timePartitionSlot =
- new TTimePartitionSlot(j * testTimePartitionInterval);
- Assert.assertTrue(timePartitionTable.containsKey(timePartitionSlot));
- if (j > timeSlotStart) {
- // Check consistency
- Assert.assertEquals(
- timePartitionTable.get(
- new TTimePartitionSlot(timeSlotStart * testTimePartitionInterval)),
- timePartitionTable.get(timePartitionSlot));
- }
- }
- }
- }
-
@Test
- public void testGetAndCreateDataPartition() throws TException, IOException, InterruptedException {
+ public void testGetDataPartition() throws TException, IOException, InterruptedException {
final int seriesPartitionBatchSize = 100;
final int timePartitionBatchSize = 10;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- TSStatus status;
TDataPartitionReq dataPartitionReq;
TDataPartitionTableResp dataPartitionTableResp;
// Prepare partitionSlotsMap
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- constructPartitionSlotsMap(sg + 0, 0, 10, 0, 10);
-
- // Set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
- status = client.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- }
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg + 10, 0, 10, 0, 10, testTimePartitionInterval);
// Test getDataPartitionTable, the result should be empty
dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
@@ -318,39 +316,13 @@ public class IoTDBClusterPartitionIT {
for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
partitionSlotsMap =
- constructPartitionSlotsMap(
- storageGroup, j, j + seriesPartitionBatchSize, k, k + timePartitionBatchSize);
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient)
- EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
- configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(
- storageGroup,
- j,
- j + seriesPartitionBatchSize,
- k,
- k + timePartitionBatchSize,
- dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ testTimePartitionInterval);
// Test getDataPartition, the result should only contain DataPartition created before
dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
@@ -359,288 +331,30 @@ public class IoTDBClusterPartitionIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
dataPartitionTableResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(
+ ConfigNodeTestUtils.checkDataPartitionTable(
storageGroup,
j,
j + seriesPartitionBatchSize,
k,
k + timePartitionBatchSize,
+ testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());
}
}
}
-
- // Test DataPartition inherit policy
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- // Normally, all Timeslots belonging to the same SeriesSlot are allocated to the
- // same DataRegionGroup
- Assert.assertEquals(
- regionInfo.getSeriesSlots() * timePartitionSlotsNum, regionInfo.getTimeSlots());
- });
- }
- }
-
- // TODO: Optimize in IOTDB-4334
- @Test
- public void testPartitionDurable() throws IOException, TException, InterruptedException {
- final int testDataNodeId = 0;
- final int seriesPartitionBatchSize = 10;
- final int timePartitionBatchSize = 10;
-
- // Shutdown the first DataNode
- EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
-
- try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- final String sg0 = sg + 0;
- final String sg1 = sg + 1;
-
- // Set StorageGroup, the result should be success
- TSetStorageGroupReq setStorageGroupReq =
- new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- TSStatus status = client.setStorageGroup(setStorageGroupReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
- status = client.setStorageGroup(setStorageGroupReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- constructPartitionSlotsMap(sg0, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
- TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- TDataPartitionTableResp dataPartitionTableResp = null;
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(
- sg0,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- dataPartitionTableResp.getDataPartitionTable());
-
- // Check Region count
- int runningCnt = 0;
- int unknownCnt = 0;
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- runningCnt += 1;
- } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
- unknownCnt += 1;
- }
- }
- // The runningCnt should be exactly twice as the unknownCnt
- // since there exists one DataNode is shutdown
- Assert.assertEquals(unknownCnt * 2, runningCnt);
-
- // Wait for shutdown check
- TShowClusterResp showClusterResp;
- while (true) {
- boolean containUnknown = false;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- containUnknown = true;
- break;
- }
- }
- if (containUnknown) {
- break;
- }
- }
- runningCnt = 0;
- unknownCnt = 0;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Running.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- runningCnt += 1;
- } else if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- unknownCnt += 1;
- }
- }
- Assert.assertEquals(2, runningCnt);
- Assert.assertEquals(1, unknownCnt);
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- partitionSlotsMap =
- constructPartitionSlotsMap(sg1, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(
- sg1,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- dataPartitionTableResp.getDataPartitionTable());
-
- // Check Region count and status
- runningCnt = 0;
- unknownCnt = 0;
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- runningCnt += 1;
- } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
- unknownCnt += 1;
- }
- }
- // The runningCnt should be exactly twice as the unknownCnt
- // since there exists one DataNode is shutdown
- Assert.assertEquals(unknownCnt * 2, runningCnt);
-
- EnvFactory.getEnv().startDataNode(testDataNodeId);
- // Wait for heartbeat check
- while (true) {
- boolean containUnknown = false;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- containUnknown = true;
- break;
- }
- }
- if (!containUnknown) {
- break;
- }
- }
-
- // All Regions should alive after the testDataNode is restarted
- boolean allRunning = true;
- for (int retry = 0; retry < 30; retry++) {
- allRunning = true;
- showRegionResp = client.showRegion(new TShowRegionReq());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- allRunning = false;
- break;
- }
- }
- if (allRunning) {
- break;
- }
-
- TimeUnit.SECONDS.sleep(1);
- }
- Assert.assertTrue(allRunning);
}
}
@Test
public void testGetSlots()
throws TException, IOException, IllegalPathException, InterruptedException {
- final String sg = "root.sg";
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
- final String d00 = sg0 + ".d0.s";
- final String d01 = sg0 + ".d1.s";
- final String d10 = sg1 + ".d0.s";
- final String d11 = sg1 + ".d1.s";
-
- final int seriesPartitionBatchSize = 100;
- final int timePartitionBatchSize = 10;
-
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- ByteBuffer buffer;
- TSchemaPartitionReq schemaPartitionReq;
-
- // We assert the correctness of setting storageGroups, dataPartitions, schemaPartitions
- // Set StorageGroups
- client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
- client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg1)));
-
- // Create SchemaPartitions
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
- schemaPartitionReq = new TSchemaPartitionReq(buffer);
- TSchemaPartitionTableResp schemaPartitionTableResp =
- client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
- TSeriesPartitionSlot schemaSlot =
- new ArrayList<>(schemaPartitionTableResp.getSchemaPartitionTable().get(sg0).keySet())
- .get(0);
-
- TDataPartitionReq dataPartitionReq;
- TDataPartitionTableResp dataPartitionTableResp;
-
- // Prepare partitionSlotsMap
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap;
-
- // Create DataPartitions
- for (int i = 0; i < 2; i++) {
- String storageGroup = sg + i;
- partitionSlotsMap =
- constructPartitionSlotsMap(
- storageGroup, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
-
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp =
- configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- }
-
- // Test getRouting api
+ // Test getSlots api
TGetRegionIdReq getRegionIdReq;
TGetRegionIdResp getRegionIdResp;
@@ -672,7 +386,17 @@ public class IoTDBClusterPartitionIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getRegionIdResp.status.getCode());
Assert.assertEquals(1, getRegionIdResp.getDataRegionIdListSize());
- getRegionIdReq.setSeriesSlotId(schemaSlot);
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ final String d10 = sg1 + ".d0.s";
+ final String d11 = sg1 + ".d1.s";
+ ByteBuffer buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq(buffer);
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getSchemaPartitionTable(schemaPartitionReq);
+ getRegionIdReq.setSeriesSlotId(
+ new ArrayList<>(schemaPartitionTableResp.getSchemaPartitionTable().get(sg0).keySet())
+ .get(0));
getRegionIdReq.setType(TConsensusGroupType.SchemaRegion);
getRegionIdResp = client.getRegionId(getRegionIdReq);
Assert.assertEquals(
@@ -722,15 +446,14 @@ public class IoTDBClusterPartitionIT {
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(102, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
getSeriesSlotListReq.setType(TConsensusGroupType.ConfigNodeRegion);
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(
- seriesPartitionBatchSize + 2, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
getSeriesSlotListReq.setType(TConsensusGroupType.SchemaRegion);
@@ -744,28 +467,19 @@ public class IoTDBClusterPartitionIT {
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(seriesPartitionBatchSize, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
}
}
@Test
public void testGetSchemaNodeManagementPartition()
throws IOException, TException, IllegalPathException, InterruptedException {
- final String sg = "root.sg";
- final int storageGroupNum = 2;
- TSStatus status;
TSchemaNodeManagementReq nodeManagementReq;
TSchemaNodeManagementResp nodeManagementResp;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- // set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
- status = client.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- }
ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
@@ -773,9 +487,9 @@ public class IoTDBClusterPartitionIT {
nodeManagementResp = client.getSchemaNodeManagementPartition(nodeManagementReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
- Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
+ Assert.assertEquals(5, nodeManagementResp.getMatchedNodeSize());
Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
- Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
+ Assert.assertEquals(2, nodeManagementResp.getSchemaRegionMapSize());
}
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
new file mode 100644
index 0000000000..31bed5a0bf
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPartitionInheritPolicyTest {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBPartitionInheritPolicyTest.class);
+
+ private static boolean originalEnableDataPartitionInheritPolicy;
+ private static final boolean testEnableDataPartitionInheritPolicy = true;
+
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testDataRegionConsensusProtocolClass =
+ ConsensusFactory.RATIS_CONSENSUS;
+
+ private static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 3;
+
+ private static long originalTimePartitionInterval;
+ private static final long testTimePartitionInterval = 604800000;
+
+ private static final String sg = "root.sg";
+ private static final int storageGroupNum = 5;
+ private static final int seriesPartitionSlotsNum = 10000;
+ private static final int timePartitionSlotsNum = 10;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass);
+
+ originalEnableDataPartitionInheritPolicy =
+ ConfigFactory.getConfig().isEnableDataPartitionInheritPolicy();
+ ConfigFactory.getConfig()
+ .setEnableDataPartitionInheritPolicy(testEnableDataPartitionInheritPolicy);
+
+ originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+ ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+
+ EnvFactory.getEnv().initBeforeClass();
+
+ // Set StorageGroups
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ for (int i = 0; i < storageGroupNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ TSStatus status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setEnableDataPartitionInheritPolicy(originalEnableDataPartitionInheritPolicy);
+ ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
+ ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ }
+
+ @Test
+ public void testDataPartitionInheritPolicy()
+ throws TException, IOException, InterruptedException {
+ final int seriesPartitionBatchSize = 100;
+ final int timePartitionBatchSize = 10;
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
+ TDataPartitionTableResp dataPartitionTableResp;
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap;
+
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
+ for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
+ partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ testTimePartitionInterval);
+
+ // Let ConfigNode create DataPartition
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
+ EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
+ configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ }
+ }
+ }
+
+ // Test DataPartition inherit policy
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ // All Timeslots belonging to the same SeriesSlot are allocated to the same
+ // DataRegionGroup
+ Assert.assertEquals(
+ regionInfo.getSeriesSlots() * timePartitionSlotsNum, regionInfo.getTimeSlots());
+ });
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 4a09ffaeb3..6bd8ed4cfb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.confignode.it.utils;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -30,10 +33,14 @@ import org.apache.iotdb.it.env.DataNodeWrapper;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.TException;
+import org.junit.Assert;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -112,4 +119,63 @@ public class ConfigNodeTestUtils {
patternTree.serialize(baos);
return ByteBuffer.wrap(baos.toByteArray());
}
+
+ public static Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
+ constructPartitionSlotsMap(
+ String storageGroup,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
+ long timePartitionInterval) {
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
+ result.put(storageGroup, new HashMap<>());
+
+ for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ for (long j = timeSlotStart; j < timeSlotEnd; j++) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(j * timePartitionInterval);
+ result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ }
+ }
+
+ return result;
+ }
+
+ public static void checkDataPartitionTable(
+ String storageGroup,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
+ long timePartitionInterval,
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+ dataPartitionTable) {
+
+ // Check the existence of StorageGroup
+ Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
+
+ // Check the number of SeriesPartitionSlot
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+ seriesPartitionTable = dataPartitionTable.get(storageGroup);
+ Assert.assertEquals(seriesSlotEnd - seriesSlotStart, seriesPartitionTable.size());
+
+ for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
+ // Check the existence of SeriesPartitionSlot
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
+
+ // Check the number of TimePartitionSlot
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
+ seriesPartitionTable.get(seriesPartitionSlot);
+ Assert.assertEquals(timeSlotEnd - timeSlotStart, timePartitionTable.size());
+
+ for (long j = timeSlotStart; j < timeSlotEnd; j++) {
+ // Check the existence of TimePartitionSlot
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(j * timePartitionInterval);
+ Assert.assertTrue(timePartitionTable.containsKey(timePartitionSlot));
+ }
+ }
+ }
}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1d12434b22..1ee8526096 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -24,7 +24,7 @@
# This parameter is unmodifiable after ConfigNode starts for the first time.
# These consensus protocols are currently supported:
# 1. org.apache.iotdb.consensus.ratis.RatisConsensus
-# Datatype: String
+# Datatype: string
# config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
# Default number of schema replicas
@@ -37,7 +37,7 @@
# These consensus protocols are currently supported:
# 1. org.apache.iotdb.consensus.simple.SimpleConsensus
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus
-# Datatype: String
+# Datatype: string
# schema_region_consensus_protocol_class=org.apache.iotdb.consensus.simple.SimpleConsensus
# Default number of data replicas
@@ -51,7 +51,7 @@
# 1. org.apache.iotdb.consensus.simple.SimpleConsensus
# 2. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus
# 3. org.apache.iotdb.consensus.ratis.RatisConsensus
-# Datatype: String
+# Datatype: string
# data_region_consensus_protocol_class=org.apache.iotdb.consensus.simple.SimpleConsensus
####################
@@ -72,19 +72,19 @@
# 4. SDBMHashExecutor
# Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and
# modify this parameter to correspond to your Java class
-# Datatype: String
+# Datatype: string
# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
# The maximum number of SchemaRegion expected to be managed by each DataNode.
# Notice: Since each StorageGroup requires at least one SchemaRegion to manage its schema,
# this parameter doesn't limit the number of SchemaRegions when there are too many StorageGroups.
-# Datatype: Double
+# Datatype: double
# schema_region_per_data_node=1.0
# The maximum number of DataRegion expected to be managed by each processor.
# Notice: Since each StorageGroup requires at least two DataRegions to manage its data,
# this parameter doesn't limit the number of DataRegions when there are too many StorageGroups.
-# Datatype: Double
+# Datatype: double
# data_region_per_processor=0.5
# Region allocate strategy
@@ -94,6 +94,12 @@
# Datatype: String
# region_allocate_strategy=GREEDY
+# Whether to enable the DataPartition inherit policy.
+# DataPartition within the same SeriesPartitionSlot will inherit
+# the allocation result of the previous TimePartitionSlot if set true
+# Datatype: boolean
+# enable_data_partition_inherit_policy=false
+
# The routing policy of read/write requests
# These routing policy are currently supported:
# 1. leader(Default, routing to leader replica)