You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/26 01:28:20 UTC
[iotdb] branch master updated: [IOTDB-4334] Verify durable cases of RegionGroup extension (#8133)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aa9d16cec5 [IOTDB-4334] Verify durable cases of RegionGroup extension (#8133)
aa9d16cec5 is described below
commit aa9d16cec553d99a27cf9a98d76b27cb48f160a2
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sat Nov 26 09:28:14 2022 +0800
[IOTDB-4334] Verify durable cases of RegionGroup extension (#8133)
---
.../client/async/AsyncDataNodeClientPool.java | 13 +-
.../confignode/manager/ClusterSchemaManager.java | 43 +-
.../iotdb/confignode/manager/ConfigManager.java | 3 +-
.../manager/load/balancer/RouteBalancer.java | 9 +-
.../router/leader/MinCostFlowLeaderBalancer.java | 2 +-
.../manager/partition/PartitionManager.java | 16 +-
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 49 +-
.../org/apache/iotdb/it/env/DataNodeWrapper.java | 10 +-
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 18 +-
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 10 +
.../it/IoTDBClusterRegionLeaderBalancingIT.java | 2 +-
.../it/partition/IoTDBPartitionDurableIT.java | 626 +++++++++++++++++++++
.../it/partition/IoTDBPartitionDurableTest.java | 323 -----------
.../it/partition/IoTDBPartitionGetterIT.java | 37 +-
...est.java => IoTDBPartitionInheritPolicyIT.java} | 28 +-
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 18 +-
18 files changed, 825 insertions(+), 397 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index bfd686a890..3973689b3b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -79,19 +79,28 @@ public class AsyncDataNodeClientPool {
}
/**
- * Send asynchronous requests to the specified DataNodes
+ * Send asynchronous requests to the specified DataNodes with default retry num
*
* <p>Notice: The DataNodes that failed to receive the requests will be reconnected
*
* @param clientHandler <RequestType, ResponseType> which will also contain the result
*/
public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
+ sendAsyncRequest(clientHandler, MAX_RETRY_NUM);
+ }
+
+ public void sendAsyncRequestToDataNodeWithRetry(
+ AsyncClientHandler<?, ?> clientHandler, int retryNum) {
+ sendAsyncRequest(clientHandler, retryNum);
+ }
+
+ private void sendAsyncRequest(AsyncClientHandler<?, ?> clientHandler, int retryNum) {
if (clientHandler.getRequestIndices().isEmpty()) {
return;
}
DataNodeRequestType requestType = clientHandler.getRequestType();
- for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
+ for (int retry = 0; retry < retryNum; retry++) {
// Always Reset CountDownLatch first
clientHandler.resetCountDownLatch();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 07b7554f1c..cf5c5e4c42 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -324,7 +324,7 @@ public class ClusterSchemaManager {
try {
// Adjust maxSchemaRegionGroupNum for each StorageGroup.
// All StorageGroups share the DataNodes equally.
- // Allocated SchemaRegionGroups are not shrunk.
+ // The allocated SchemaRegionGroups will not be shrunk.
int allocatedSchemaRegionGroupCount =
getPartitionManager()
.getRegionGroupCount(
@@ -335,20 +335,24 @@ public class ClusterSchemaManager {
// by parameter least_schema_region_group_num, which is currently unconfigurable.
LEAST_SCHEMA_REGION_GROUP_NUM,
Math.max(
- // The maxSchemaRegionGroupNum of the current StorageGroup is expected to be
- // (SCHEMA_REGION_PER_DATA_NODE * registerDataNodeNum) /
- // (createdStorageGroupNum * schemaReplicationFactor)
(int)
- (SCHEMA_REGION_PER_DATA_NODE
- * dataNodeNum
- / (double)
- (storageGroupNum
- * storageGroupSchema.getSchemaReplicationFactor())),
+ // Use Math.ceil here to ensure that the maxSchemaRegionGroupNum
+ // will be increased as long as the number of cluster DataNodes is increased
+ Math.ceil(
+ // The maxSchemaRegionGroupNum of the current StorageGroup
+ // is expected to be:
+ // (SCHEMA_REGION_PER_DATA_NODE * registerDataNodeNum) /
+ // (createdStorageGroupNum * schemaReplicationFactor)
+ SCHEMA_REGION_PER_DATA_NODE
+ * dataNodeNum
+ / (double)
+ (storageGroupNum
+ * storageGroupSchema.getSchemaReplicationFactor())),
allocatedSchemaRegionGroupCount));
// Adjust maxDataRegionGroupNum for each StorageGroup.
// All StorageGroups divide the total cpu cores equally.
- // Allocated DataRegionGroups are not shrunk.
+ // The allocated DataRegionGroups will not be shrunk.
int allocatedDataRegionGroupCount =
getPartitionManager()
.getRegionGroupCount(storageGroupSchema.getName(), TConsensusGroupType.DataRegion);
@@ -358,14 +362,19 @@ public class ClusterSchemaManager {
// by parameter least_data_region_group_num.
LEAST_DATA_REGION_GROUP_NUM,
Math.max(
- // The maxDataRegionGroupNum of the current StorageGroup is expected to be
- // (DATA_REGION_PER_PROCESSOR * totalCpuCoreNum) /
- // (createdStorageGroupNum * dataReplicationFactor)
(int)
- (DATA_REGION_PER_PROCESSOR
- * totalCpuCoreNum
- / (double)
- (storageGroupNum * storageGroupSchema.getDataReplicationFactor())),
+ // Use Math.ceil here to ensure that the maxDataRegionGroupNum
+ // will be increased as long as the number of cluster DataNodes is increased
+ Math.ceil(
+ // The maxDataRegionGroupNum of the current StorageGroup
+ // is expected to be:
+ // (DATA_REGION_PER_PROCESSOR * totalCpuCoreNum) /
+ // (createdStorageGroupNum * dataReplicationFactor)
+ DATA_REGION_PER_PROCESSOR
+ * totalCpuCoreNum
+ / (double)
+ (storageGroupNum
+ * storageGroupSchema.getDataReplicationFactor())),
allocatedDataRegionGroupCount));
adjustMaxRegionGroupNumPlan.putEntry(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index e521529831..f5dc00c5d0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -635,8 +635,7 @@ public class ConfigManager implements IManager {
resp = queryResult.convertToTDataPartitionTableResp();
- // TODO: set debug
- LOGGER.info(
+ LOGGER.debug(
"GetOrCreateDataPartition success. receive PartitionSlotsMap: {}, return: {}",
getOrCreateDataPartitionReq.getPartitionSlotsMap(),
resp);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 6e0a7d95a3..cfd488dc4a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -247,8 +247,8 @@ public class RouteBalancer {
leaderBalancingExecutor,
this::balancingRegionLeader,
0,
- // Execute route balancing service in every 5 loops of heartbeat service
- NodeManager.HEARTBEAT_INTERVAL * 5,
+ // Execute route balancing service in every 20 loops of heartbeat service
+ NodeManager.HEARTBEAT_INTERVAL * 20,
TimeUnit.MILLISECONDS);
LOGGER.info("Route-Balancing service is started successfully.");
}
@@ -293,7 +293,7 @@ public class RouteBalancer {
new AsyncClientHandler<>(DataNodeRequestType.CHANGE_REGION_LEADER);
leaderDistribution.forEach(
(regionGroupId, newLeaderId) -> {
- if (newLeaderId != regionRouteMap.getLeader(regionGroupId)) {
+ if (newLeaderId != -1 && newLeaderId != regionRouteMap.getLeader(regionGroupId)) {
String consensusProtocolClass;
switch (regionGroupId.getType()) {
case SchemaRegion:
@@ -318,7 +318,8 @@ public class RouteBalancer {
});
if (requestId.get() > 0) {
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ // Don't retry ChangeLeader request
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler, 1);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
index e07bd3e288..8e7fabe9b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -323,7 +323,7 @@ public class MinCostFlowLeaderBalancer implements ILeaderBalancer {
}
}
if (!matchLeader) {
- result.put(regionGroupId, regionLeaderMap.get(regionGroupId));
+ result.put(regionGroupId, regionLeaderMap.getOrDefault(regionGroupId, -1));
}
});
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index d1ad916399..de89381bdc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -197,7 +197,7 @@ public class PartitionManager {
unassignedSchemaPartitionSlotsCountMap.put(
storageGroup, unassignedSchemaPartitionSlots.size()));
TSStatus status =
- extendRegionsIfNecessary(
+ extendRegionGroupIfNecessary(
unassignedSchemaPartitionSlotsCountMap, TConsensusGroupType.SchemaRegion);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return an error code if Region extension failed
@@ -274,7 +274,7 @@ public class PartitionManager {
unassignedDataPartitionSlotsCountMap.put(
storageGroup, unassignedDataPartitionSlots.size()));
TSStatus status =
- extendRegionsIfNecessary(
+ extendRegionGroupIfNecessary(
unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return an error code if Region extension failed
@@ -318,14 +318,14 @@ public class PartitionManager {
// ======================================================
/**
- * Allocate more Regions to StorageGroups who have too many slots.
+ * Allocate more RegionGroup to the specified StorageGroups if necessary.
*
* @param unassignedPartitionSlotsCountMap Map<StorageGroup, unassigned Partition count>
* @param consensusGroupType SchemaRegion or DataRegion
- * @return SUCCESS_STATUS when Region extension successful; NOT_ENOUGH_DATA_NODE when there are
- * not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist
+ * @return SUCCESS_STATUS when RegionGroup extension successful; NOT_ENOUGH_DATA_NODE when there
+ * are not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist
*/
- private TSStatus extendRegionsIfNecessary(
+ private TSStatus extendRegionGroupIfNecessary(
Map<String, Integer> unassignedPartitionSlotsCountMap,
TConsensusGroupType consensusGroupType) {
TSStatus result = new TSStatus();
@@ -348,11 +348,11 @@ public class PartitionManager {
getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
float maxSlotCount = CONF.getSeriesPartitionSlotNum();
- /* Region extension is required in the following cases */
+ /* RegionGroup extension is required in the following cases */
// 1. The number of current RegionGroup of the StorageGroup is less than the least number
int leastRegionGroupNum =
TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
- ? 1
+ ? CONF.getLeastSchemaRegionGroupNum()
: CONF.getLeastDataRegionGroupNum();
if (allocatedRegionGroupCount < leastRegionGroupNum) {
// Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index aa06f14d94..147bff2b27 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.it.env;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
@@ -241,7 +242,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (flag) {
Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
for (String status : nodeStatus.values()) {
- if (!status.equals("Running")) {
+ if (NodeStatus.Unknown.getStatus().equals(status)) {
flag = false;
break;
}
@@ -257,7 +258,10 @@ public abstract class AbstractEnv implements BaseEnv {
}
TimeUnit.SECONDS.sleep(1L);
}
- throw lastException;
+
+ if (lastException != null) {
+ throw lastException;
+ }
}
@Override
@@ -436,7 +440,7 @@ public abstract class AbstractEnv implements BaseEnv {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
try {
SyncConfigNodeIServiceClient client =
- clientManager.borrowClient(
+ clientManager.purelyBorrowClient(
new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));
TShowClusterResp resp = client.showCluster();
@@ -507,6 +511,45 @@ public abstract class AbstractEnv implements BaseEnv {
configNodeWrapperList.get(index).stop();
}
+ @Override
+ public DataNodeWrapper getDataNodeWrapper(int index) {
+ return dataNodeWrapperList.get(index);
+ }
+
+ @Override
+ public void registerNewDataNode() {
+ // Config new DataNode
+ DataNodeWrapper newDataNodeWrapper =
+ new DataNodeWrapper(
+ configNodeWrapperList.get(0).getIpAndPortString(),
+ getTestClassName(),
+ getTestMethodName(),
+ EnvUtils.searchAvailablePorts());
+ dataNodeWrapperList.add(newDataNodeWrapper);
+ newDataNodeWrapper.createDir();
+ newDataNodeWrapper.changeConfig(ConfigFactory.getConfig().getEngineProperties());
+
+ // Start new DataNode
+ List<String> dataNodeEndpoints =
+ Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
+ RequestDelegate<Void> dataNodesDelegate =
+ new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+ dataNodesDelegate.addRequest(
+ () -> {
+ newDataNodeWrapper.start();
+ return null;
+ });
+ try {
+ dataNodesDelegate.requestAll();
+ } catch (SQLException e) {
+ logger.error("Start dataNodes failed", e);
+ fail();
+ }
+
+ // Test whether register success
+ testWorking();
+ }
+
@Override
public void startDataNode(int index) {
dataNodeWrapperList.get(index).start();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index 49cdd9319b..dd7e67d9b6 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -30,7 +30,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
private final String targetConfigNode;
private int mppDataExchangePort;
private int internalPort;
- private String internal_address;
+ private final String internalAddress;
private final int dataRegionConsensusPort;
private final int schemaRegionConsensusPort;
private final int mqttPort;
@@ -39,7 +39,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
String targetConfigNode, String testClassName, String testMethodName, int[] portList) {
super(testClassName, testMethodName, portList);
this.targetConfigNode = targetConfigNode;
- this.internal_address = super.getIp();
+ this.internalAddress = super.getIp();
this.mppDataExchangePort = portList[1];
this.internalPort = portList[2];
this.dataRegionConsensusPort = portList[3];
@@ -51,7 +51,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
protected void updateConfig(Properties properties) {
properties.setProperty(IoTDBConstant.DN_RPC_ADDRESS, super.getIp());
properties.setProperty(IoTDBConstant.DN_RPC_PORT, String.valueOf(super.getPort()));
- properties.setProperty(IoTDBConstant.DN_INTERNAL_ADDRESS, this.internal_address);
+ properties.setProperty(IoTDBConstant.DN_INTERNAL_ADDRESS, this.internalAddress);
properties.setProperty(IoTDBConstant.DN_INTERNAL_PORT, String.valueOf(this.internalPort));
properties.setProperty("dn_mpp_data_exchange_port", String.valueOf(this.mppDataExchangePort));
properties.setProperty(
@@ -133,6 +133,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
this.mppDataExchangePort = mppDataExchangePort;
}
+ public String getInternalAddress() {
+ return internalAddress;
+ }
+
public int getInternalPort() {
return internalPort;
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 39ee0c3e58..bc54f67ca1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -242,6 +242,13 @@ public class MppConfig implements BaseConfig {
return this;
}
+ @Override
+ public BaseConfig setSeriesPartitionSlotNum(int seriesPartitionSlotNum) {
+ confignodeProperties.setProperty(
+ "series_partition_slot_num", String.valueOf(seriesPartitionSlotNum));
+ return this;
+ }
+
@Override
public BaseConfig setTimePartitionInterval(long timePartitionInterval) {
confignodeProperties.setProperty(
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index c8cc6bb661..bb3b1340e2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -176,22 +176,32 @@ public class RemoteServerEnv implements BaseEnv {
@Override
public void startConfigNode(int index) {
- getConfigNodeWrapperList().get(index).start();
+ throw new UnsupportedOperationException();
}
@Override
public void shutdownConfigNode(int index) {
- getConfigNodeWrapperList().get(index).stop();
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataNodeWrapper getDataNodeWrapper(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerNewDataNode() {
+ throw new UnsupportedOperationException();
}
@Override
public void startDataNode(int index) {
- getDataNodeWrapperList().get(index).start();
+ throw new UnsupportedOperationException();
}
@Override
public void shutdownDataNode(int index) {
- getDataNodeWrapperList().get(index).stop();
+ throw new UnsupportedOperationException();
}
@Override
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 97bd4eb24e..e5a237317d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -247,6 +247,14 @@ public interface BaseConfig {
return 1;
}
+ default BaseConfig setSeriesPartitionSlotNum(int seriesPartitionSlotNum) {
+ return this;
+ }
+
+ default int getSeriesPartitionSlotNum() {
+ return 10000;
+ }
+
default BaseConfig setTimePartitionInterval(long timePartitionInterval) {
return this;
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 674bfbe55e..04ae16ba74 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -189,12 +189,22 @@ public interface BaseEnv {
/** @return The index of ConfigNode-Leader in configNodeWrapperList */
int getLeaderConfigNodeIndex() throws IOException, InterruptedException;
+ /** Start an existed ConfigNode */
void startConfigNode(int index);
+ /** Shutdown an existed ConfigNode */
void shutdownConfigNode(int index);
+ /** @return The TDataNodeLocation of the specified DataNode */
+ DataNodeWrapper getDataNodeWrapper(int index);
+
+ /** Register a new DataNode */
+ void registerNewDataNode();
+
+ /** Start an existed DataNode */
void startDataNode(int index);
+ /** Shutdown an existed DataNode */
void shutdownDataNode(int index);
int getMqttPort();
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index ae463159f1..a519c9628a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -169,7 +169,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
throws IOException, InterruptedException, TException, IllegalPathException {
final int testConfigNodeNum = 1;
final int testDataNodeNum = 3;
- final int retryNum = 40;
+ final int retryNum = 100;
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
TSStatus status;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
new file mode 100644
index 0000000000..0b77cfbf86
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.DataNodeWrapper;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPartitionDurableIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionDurableIT.class);
+
+ private static String originalConfigNodeConsensusProtocolClass;
+ private static String originalSchemaRegionConsensusProtocolClass;
+ private static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+ private static int originalSchemaReplicationFactor;
+ private static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 3;
+
+ private static long originalTimePartitionInterval;
+ private static final long testTimePartitionInterval = 604800000;
+
+ private static final int testDataNodeId = 0;
+ private static final String sg = "root.sg";
+ final String d0 = sg + ".d0.s";
+ final String d1 = sg + ".d1.s";
+ private static final int testSeriesPartitionBatchSize = 1;
+ private static final int testTimePartitionBatchSize = 1;
+ private static final TEndPoint defaultEndPoint = new TEndPoint("-1", -1);
+ private static final TDataNodeLocation defaultDataNode =
+ new TDataNodeLocation(
+ -1,
+ new TEndPoint(defaultEndPoint),
+ new TEndPoint(defaultEndPoint),
+ new TEndPoint(defaultEndPoint),
+ new TEndPoint(defaultEndPoint),
+ new TEndPoint(defaultEndPoint));
+
+ @Before
+ public void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ originalSchemaRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+ originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
+ originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+ ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+
+ setStorageGroup();
+ }
+
+ private void setStorageGroup() throws IOException, InterruptedException, TException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TSetStorageGroupReq setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg));
+ TSStatus status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+
+ ConfigFactory.getConfig().setSchemaReplicationFactor(originalSchemaReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
+
+ ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ }
+
+ @Test
+ public void testRemovingDataNode()
+ throws IOException, InterruptedException, TException, IllegalPathException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ /* Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartition and return */
+ TSchemaPartitionReq schemaPartitionReq =
+ new TSchemaPartitionReq()
+ .setPathPatternTree(ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d0}));
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
+ schemaPartitionTableResp.getSchemaPartitionTable();
+ // Successfully create a SchemaPartition
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg));
+ Assert.assertEquals(1, schemaPartitionTable.get(sg).size());
+
+ /* Check Region distribution */
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ // Create exactly one RegionGroup
+ Assert.assertEquals(3, showRegionResp.getRegionInfoListSize());
+ // Each DataNode has exactly one Region
+ Set<Integer> dataNodeIdSet = new HashSet<>();
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(regionInfo -> dataNodeIdSet.add(regionInfo.getDataNodeId()));
+ Assert.assertEquals(3, dataNodeIdSet.size());
+
+ /* Change the NodeStatus of the test DataNode to Removing */
+ TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq();
+ DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId);
+ setDataNodeStatusReq.setTargetDataNode(
+ new TDataNodeLocation(defaultDataNode)
+ .setInternalEndPoint(
+ new TEndPoint()
+ .setIp(dataNodeWrapper.getInternalAddress())
+ .setPort(dataNodeWrapper.getInternalPort())));
+ setDataNodeStatusReq.setStatus(NodeStatus.Removing.getStatus());
+ client.setDataNodeStatus(setDataNodeStatusReq);
+ // Waiting for heartbeat update
+ while (true) {
+ AtomicBoolean containRemoving = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if (NodeStatus.Removing.getStatus().equals(dataNodeInfo.getStatus())) {
+ containRemoving.set(true);
+ }
+ });
+
+ if (containRemoving.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ /* Test getOrCreateSchemaPartition, the result should be NO_AVAILABLE_REGION_GROUP */
+ schemaPartitionReq =
+ new TSchemaPartitionReq()
+ .setPathPatternTree(ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d1}));
+ schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+
+ /* Register a new DataNode */
+ EnvFactory.getEnv().registerNewDataNode();
+
+ /* Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartition and return */
+ schemaPartitionReq =
+ new TSchemaPartitionReq()
+ .setPathPatternTree(ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d1}));
+ schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ // Successfully create a SchemaPartition
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg));
+ Assert.assertEquals(1, schemaPartitionTable.get(sg).size());
+
+ /* Check Region distribution */
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ // There should be 2 RegionGroups
+ Assert.assertEquals(6, showRegionResp.getRegionInfoListSize());
+ // The new RegionGroup should keep away from the Removing DataNode
+ Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo ->
+ regionCounter
+ .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ dataNodeIdSet.forEach(dataNodeId -> regionCounter.get(dataNodeId).getAndDecrement());
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showDataNodesResp.getStatus().getCode());
+ regionCounter.forEach(
+ (dataNodeId, regionCount) -> {
+ String nodeStatus =
+ showDataNodesResp.getDataNodesInfoList().stream()
+ .filter(dataNodeInfo -> dataNodeInfo.getDataNodeId() == dataNodeId)
+ .findFirst()
+ .orElse(new TDataNodeInfo().setStatus("ERROR"))
+ .getStatus();
+ if (NodeStatus.Removing.getStatus().equals(nodeStatus)) {
+ Assert.assertEquals(0, regionCount.get());
+ } else if (NodeStatus.Running.getStatus().equals(nodeStatus)) {
+ Assert.assertEquals(1, regionCount.get());
+ } else {
+ Assert.fail();
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testReadOnlyDataNode() throws IOException, InterruptedException, TException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ /* Test getOrCreateDataPartition, ConfigNode should create DataPartition and return */
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg,
+ 0,
+ testSeriesPartitionBatchSize,
+ 0,
+ testTimePartitionBatchSize,
+ testTimePartitionInterval);
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg,
+ 0,
+ testSeriesPartitionBatchSize,
+ 0,
+ testTimePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ /* Check Region distribution */
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ // Create exactly one RegionGroup
+ Assert.assertEquals(3, showRegionResp.getRegionInfoListSize());
+ // Each DataNode has exactly one Region
+ Set<Integer> dataNodeIdSet = new HashSet<>();
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(regionInfo -> dataNodeIdSet.add(regionInfo.getDataNodeId()));
+ Assert.assertEquals(3, dataNodeIdSet.size());
+
+ /* Change the NodeStatus of the test DataNode to ReadOnly */
+ TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq();
+ DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(testDataNodeId);
+ setDataNodeStatusReq.setTargetDataNode(
+ new TDataNodeLocation(defaultDataNode)
+ .setInternalEndPoint(
+ new TEndPoint()
+ .setIp(dataNodeWrapper.getInternalAddress())
+ .setPort(dataNodeWrapper.getInternalPort())));
+ setDataNodeStatusReq.setStatus(NodeStatus.ReadOnly.getStatus());
+ client.setDataNodeStatus(setDataNodeStatusReq);
+ // Waiting for heartbeat update
+ while (true) {
+ AtomicBoolean containReadOnly = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if (NodeStatus.ReadOnly.getStatus().equals(dataNodeInfo.getStatus())) {
+ containReadOnly.set(true);
+ }
+ });
+
+ if (containReadOnly.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ /* Test getOrCreateDataPartition, the result should be NO_ENOUGH_DATANODE */
+ partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg,
+ 1,
+ 1 + testSeriesPartitionBatchSize,
+ 1,
+ 1 + testTimePartitionBatchSize,
+ testTimePartitionInterval);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ dataPartitionTableResp = client.getOrCreateDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+
+ /* Register a new DataNode */
+ EnvFactory.getEnv().registerNewDataNode();
+
+ /* Test getOrCreateDataPartition, ConfigNode should create DataPartition and return */
+ partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg,
+ 1,
+ 1 + testSeriesPartitionBatchSize,
+ 1,
+ 1 + testTimePartitionBatchSize,
+ testTimePartitionInterval);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ dataPartitionTableResp = client.getOrCreateDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg,
+ 1,
+ 1 + testSeriesPartitionBatchSize,
+ 1,
+ 1 + testTimePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ /* Check Region distribution */
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ // There should be 2 RegionGroups
+ Assert.assertEquals(6, showRegionResp.getRegionInfoListSize());
+ // The new RegionGroup should keep away from the ReadOnly DataNode
+ Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo ->
+ regionCounter
+ .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0))
+ .getAndIncrement());
+ dataNodeIdSet.forEach(dataNodeId -> regionCounter.get(dataNodeId).getAndDecrement());
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showDataNodesResp.getStatus().getCode());
+ regionCounter.forEach(
+ (dataNodeId, regionCount) -> {
+ String nodeStatus =
+ showDataNodesResp.getDataNodesInfoList().stream()
+ .filter(dataNodeInfo -> dataNodeInfo.getDataNodeId() == dataNodeId)
+ .findFirst()
+ .orElse(new TDataNodeInfo().setStatus("ERROR"))
+ .getStatus();
+ if (NodeStatus.ReadOnly.getStatus().equals(nodeStatus)) {
+ Assert.assertEquals(0, regionCount.get());
+ } else if (NodeStatus.Running.getStatus().equals(nodeStatus)) {
+ Assert.assertEquals(1, regionCount.get());
+ } else {
+ Assert.fail();
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testUnknownDataNode() throws IOException, TException, InterruptedException {
+ // Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
+ EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg,
+ 0,
+ testSeriesPartitionBatchSize,
+ 0,
+ testTimePartitionBatchSize,
+ testTimePartitionInterval);
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg,
+ 0,
+ testSeriesPartitionBatchSize,
+ 0,
+ testTimePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count
+ int runningCnt = 0;
+ int unknownCnt = 0;
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ // Wait for shutdown check
+ TShowClusterResp showClusterResp;
+ while (true) {
+ AtomicBoolean containUnknown = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+ containUnknown.set(true);
+ }
+ });
+
+ if (containUnknown.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ runningCnt = 0;
+ unknownCnt = 0;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Running.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ runningCnt += 1;
+ } else if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ unknownCnt += 1;
+ }
+ }
+ Assert.assertEquals(2, runningCnt);
+ Assert.assertEquals(1, unknownCnt);
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ sg,
+ 1,
+ 1 + testSeriesPartitionBatchSize,
+ 1,
+ 1 + testTimePartitionBatchSize,
+ testTimePartitionInterval);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ sg,
+ 1,
+ 1 + testSeriesPartitionBatchSize,
+ 1,
+ 1 + testTimePartitionBatchSize,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count and status
+ runningCnt = 0;
+ unknownCnt = 0;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ EnvFactory.getEnv().startDataNode(testDataNodeId);
+ // Wait for heartbeat check
+ while (true) {
+ boolean containUnknown = false;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ containUnknown = true;
+ break;
+ }
+ }
+ if (!containUnknown) {
+ break;
+ }
+ }
+
+ // All Regions should alive after the testDataNode is restarted
+ boolean allRunning = true;
+ for (int retry = 0; retry < 30; retry++) {
+ allRunning = true;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ allRunning = false;
+ break;
+ }
+ }
+
+ if (allRunning) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(allRunning);
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java
deleted file mode 100644
index 2219e5b267..0000000000
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.it.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.it.env.ConfigFactory;
-import org.apache.iotdb.it.env.EnvFactory;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.ClusterIT;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-@RunWith(IoTDBTestRunner.class)
-@Category({ClusterIT.class})
-public class IoTDBPartitionDurableTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionDurableTest.class);
-
- private static String originalConfigNodeConsensusProtocolClass;
- private static String originalSchemaRegionConsensusProtocolClass;
- private static String originalDataRegionConsensusProtocolClass;
- private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
-
- private static int originalSchemaReplicationFactor;
- private static int originalDataReplicationFactor;
- private static final int testReplicationFactor = 3;
-
- private static long originalTimePartitionInterval;
- private static final long testTimePartitionInterval = 604800000;
-
- private static final String sg = "root.sg";
-
- @BeforeClass
- public static void setUp() throws Exception {
- originalConfigNodeConsensusProtocolClass =
- ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
- originalSchemaRegionConsensusProtocolClass =
- ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
- originalDataRegionConsensusProtocolClass =
- ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
- ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
- ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
- ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
-
- originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
- originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
- ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
- ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
-
- originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
- ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
-
- // Init 1C3D environment
- EnvFactory.getEnv().initClusterEnvironment(1, 3);
- }
-
- @AfterClass
- public static void tearDown() {
- EnvFactory.getEnv().cleanAfterClass();
-
- ConfigFactory.getConfig()
- .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
- ConfigFactory.getConfig()
- .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
- ConfigFactory.getConfig()
- .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
-
- ConfigFactory.getConfig().setSchemaReplicationFactor(originalSchemaReplicationFactor);
- ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
-
- ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
- }
-
- // TODO: Optimize this in IOTDB-4334
- @Test
- public void testPartitionDurable() throws IOException, TException, InterruptedException {
- final int testDataNodeId = 0;
- final int seriesPartitionBatchSize = 10;
- final int timePartitionBatchSize = 10;
-
- // Shutdown the first DataNode
- EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
-
- try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- final String sg0 = sg + 0;
- final String sg1 = sg + 1;
-
- // Set StorageGroup, the result should be success
- TSetStorageGroupReq setStorageGroupReq =
- new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- TSStatus status = client.setStorageGroup(setStorageGroupReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
- status = client.setStorageGroup(setStorageGroupReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg0,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- testTimePartitionInterval);
- TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- TDataPartitionTableResp dataPartitionTableResp = null;
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg0,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
-
- // Check Region count
- int runningCnt = 0;
- int unknownCnt = 0;
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- runningCnt += 1;
- } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
- unknownCnt += 1;
- }
- }
- // The runningCnt should be exactly twice as the unknownCnt
- // since there exists one DataNode is shutdown
- Assert.assertEquals(unknownCnt * 2, runningCnt);
-
- // Wait for shutdown check
- TShowClusterResp showClusterResp;
- while (true) {
- boolean containUnknown = false;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- containUnknown = true;
- break;
- }
- }
- if (containUnknown) {
- break;
- }
- }
- runningCnt = 0;
- unknownCnt = 0;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Running.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- runningCnt += 1;
- } else if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- unknownCnt += 1;
- }
- }
- Assert.assertEquals(2, runningCnt);
- Assert.assertEquals(1, unknownCnt);
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- partitionSlotsMap =
- ConfigNodeTestUtils.constructPartitionSlotsMap(
- sg1,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- testTimePartitionInterval);
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
- for (int retry = 0; retry < 5; retry++) {
- // Build new Client since it's unstable in Win8 environment
- try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
- break;
- }
- } catch (Exception e) {
- // Retry sometimes in order to avoid request timeout
- LOGGER.error(e.getMessage());
- TimeUnit.SECONDS.sleep(1);
- }
- }
- Assert.assertNotNull(dataPartitionTableResp);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- ConfigNodeTestUtils.checkDataPartitionTable(
- sg1,
- 0,
- seriesPartitionBatchSize,
- 0,
- timePartitionBatchSize,
- testTimePartitionInterval,
- dataPartitionTableResp.getDataPartitionTable());
-
- // Check Region count and status
- runningCnt = 0;
- unknownCnt = 0;
- showRegionResp = client.showRegion(new TShowRegionReq());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- runningCnt += 1;
- } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
- unknownCnt += 1;
- }
- }
- // The runningCnt should be exactly twice as the unknownCnt
- // since there exists one DataNode is shutdown
- Assert.assertEquals(unknownCnt * 2, runningCnt);
-
- EnvFactory.getEnv().startDataNode(testDataNodeId);
- // Wait for heartbeat check
- while (true) {
- boolean containUnknown = false;
- showClusterResp = client.showCluster();
- for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
- if (NodeStatus.Unknown.getStatus()
- .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
- containUnknown = true;
- break;
- }
- }
- if (!containUnknown) {
- break;
- }
- }
-
- // All Regions should alive after the testDataNode is restarted
- boolean allRunning = true;
- for (int retry = 0; retry < 30; retry++) {
- allRunning = true;
- showRegionResp = client.showRegion(new TShowRegionReq());
- for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
- if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
- allRunning = false;
- break;
- }
- }
- if (allRunning) {
- break;
- }
-
- TimeUnit.SECONDS.sleep(1);
- }
- Assert.assertTrue(allRunning);
- }
- }
-}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index 72ab0a074f..6bf8b3d237 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -86,17 +86,19 @@ public class IoTDBPartitionGetterIT {
private static int originalDataReplicationFactor;
private static final int testReplicationFactor = 3;
+ private static int originalSeriesPartitionSlotNum;
+
private static long originalTimePartitionInterval;
private static final long testTimePartitionInterval = 604800000;
protected static int originalLeastDataRegionGroupNum;
- private static final int testLeastDataRegionGroupNum = 10;
+ private static final int testLeastDataRegionGroupNum = 3;
private static final String sg = "root.sg";
- private static final int storageGroupNum = 5;
- private static final int seriesPartitionSlotsNum = 10000;
- private static final int seriesPartitionBatchSize = 100;
- private static final int timePartitionSlotsNum = 10;
+ private static final int storageGroupNum = 2;
+ private static final int testSeriesPartitionSlotNum = 1000;
+ private static final int seriesPartitionBatchSize = 10;
+ private static final int testTimePartitionSlotsNum = 10;
private static final int timePartitionBatchSize = 10;
@BeforeClass
@@ -116,6 +118,9 @@ public class IoTDBPartitionGetterIT {
CONF.setSchemaReplicationFactor(testReplicationFactor);
CONF.setDataReplicationFactor(testReplicationFactor);
+ originalSeriesPartitionSlotNum = CONF.getSeriesPartitionSlotNum();
+ CONF.setSeriesPartitionSlotNum(testSeriesPartitionSlotNum);
+
originalTimePartitionInterval = CONF.getTimePartitionInterval();
CONF.setTimePartitionInterval(testTimePartitionInterval);
@@ -168,8 +173,8 @@ public class IoTDBPartitionGetterIT {
/* Create DataPartitions */
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
- for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
- for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
+ for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) {
+ for (long k = 0; k < testTimePartitionSlotsNum; k += timePartitionBatchSize) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
storageGroup,
@@ -227,6 +232,7 @@ public class IoTDBPartitionGetterIT {
CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
CONF.setDataReplicationFactor(originalDataReplicationFactor);
+ CONF.setSeriesPartitionSlotNum(originalSeriesPartitionSlotNum);
CONF.setTimePartitionInterval(originalTimePartitionInterval);
}
@@ -320,8 +326,8 @@ public class IoTDBPartitionGetterIT {
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
- for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
- for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
+ for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) {
+ for (long k = 0; k < testTimePartitionSlotsNum; k += timePartitionBatchSize) {
partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
storageGroup,
@@ -399,7 +405,7 @@ public class IoTDBPartitionGetterIT {
getRegionIdResp = client.getRegionId(getRegionIdReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getRegionIdResp.status.getCode());
- Assert.assertEquals(10, getRegionIdResp.getDataRegionIdListSize());
+ Assert.assertEquals(testLeastDataRegionGroupNum, getRegionIdResp.getDataRegionIdListSize());
final String d00 = sg0 + ".d0.s";
final String d01 = sg0 + ".d1.s";
@@ -461,14 +467,16 @@ public class IoTDBPartitionGetterIT {
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(
+ testSeriesPartitionSlotNum, getSeriesSlotListResp.getSeriesSlotListSize());
getSeriesSlotListReq.setType(TConsensusGroupType.ConfigNodeRegion);
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(
+ testSeriesPartitionSlotNum, getSeriesSlotListResp.getSeriesSlotListSize());
getSeriesSlotListReq.setType(TConsensusGroupType.SchemaRegion);
@@ -482,7 +490,8 @@ public class IoTDBPartitionGetterIT {
getSeriesSlotListResp = client.getSeriesSlotList(getSeriesSlotListReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getSeriesSlotListResp.status.getCode());
- Assert.assertEquals(seriesPartitionSlotsNum, getSeriesSlotListResp.getSeriesSlotListSize());
+ Assert.assertEquals(
+ testSeriesPartitionSlotNum, getSeriesSlotListResp.getSeriesSlotListSize());
}
}
@@ -502,7 +511,7 @@ public class IoTDBPartitionGetterIT {
nodeManagementResp = client.getSchemaNodeManagementPartition(nodeManagementReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
- Assert.assertEquals(5, nodeManagementResp.getMatchedNodeSize());
+ Assert.assertEquals(storageGroupNum, nodeManagementResp.getMatchedNodeSize());
Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
Assert.assertEquals(2, nodeManagementResp.getSchemaRegionMapSize());
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
similarity index 86%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index fb9952e29f..52deb6d02f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -53,10 +53,9 @@ import java.util.concurrent.TimeUnit;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBPartitionInheritPolicyTest {
+public class IoTDBPartitionInheritPolicyIT {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBPartitionInheritPolicyTest.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionInheritPolicyIT.class);
private static boolean originalEnableDataPartitionInheritPolicy;
private static final boolean testEnableDataPartitionInheritPolicy = true;
@@ -68,13 +67,17 @@ public class IoTDBPartitionInheritPolicyTest {
private static int originalDataReplicationFactor;
private static final int testReplicationFactor = 3;
+ private static int originalSeriesPartitionSlotNum;
+
private static long originalTimePartitionInterval;
private static final long testTimePartitionInterval = 604800000;
private static final String sg = "root.sg";
- private static final int storageGroupNum = 5;
- private static final int seriesPartitionSlotsNum = 10000;
- private static final int timePartitionSlotsNum = 10;
+ private static final int storageGroupNum = 2;
+ private static final int testSeriesPartitionSlotNum = 1000;
+ private static final int seriesPartitionBatchSize = 10;
+ private static final int testTimePartitionSlotsNum = 10;
+ private static final int timePartitionBatchSize = 10;
@BeforeClass
public static void setUp() throws Exception {
@@ -91,6 +94,9 @@ public class IoTDBPartitionInheritPolicyTest {
originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+ originalSeriesPartitionSlotNum = ConfigFactory.getConfig().getSeriesPartitionSlotNum();
+ ConfigFactory.getConfig().setSeriesPartitionSlotNum(testSeriesPartitionSlotNum * 10);
+
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
@@ -117,14 +123,13 @@ public class IoTDBPartitionInheritPolicyTest {
ConfigFactory.getConfig()
.setEnableDataPartitionInheritPolicy(originalEnableDataPartitionInheritPolicy);
ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
+ ConfigFactory.getConfig().setSeriesPartitionSlotNum(originalSeriesPartitionSlotNum);
ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
}
@Test
public void testDataPartitionInheritPolicy()
throws TException, IOException, InterruptedException {
- final int seriesPartitionBatchSize = 100;
- final int timePartitionBatchSize = 10;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -134,8 +139,8 @@ public class IoTDBPartitionInheritPolicyTest {
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
- for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) {
- for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) {
+ for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) {
+ for (long k = 0; k < testTimePartitionSlotsNum; k += timePartitionBatchSize) {
partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
storageGroup,
@@ -176,7 +181,8 @@ public class IoTDBPartitionInheritPolicyTest {
// All Timeslots belonging to the same SeriesSlot are allocated to the same
// DataRegionGroup
Assert.assertEquals(
- regionInfo.getSeriesSlots() * timePartitionSlotsNum, regionInfo.getTimeSlots());
+ regionInfo.getSeriesSlots() * testTimePartitionSlotsNum,
+ regionInfo.getTimeSlots());
});
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 7e46ef360a..608961bb57 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -185,22 +185,32 @@ public class StandaloneEnv implements BaseEnv {
@Override
public void startConfigNode(int index) {
- // Do nothing
+ throw new UnsupportedOperationException();
}
@Override
public void shutdownConfigNode(int index) {
- // Do nothing
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataNodeWrapper getDataNodeWrapper(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerNewDataNode() {
+ throw new UnsupportedOperationException();
}
@Override
public void startDataNode(int index) {
- // Do nothing
+ throw new UnsupportedOperationException();
}
@Override
public void shutdownDataNode(int index) {
- // Do nothing
+ throw new UnsupportedOperationException();
}
@Override