You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/18 12:29:03 UTC
[iotdb] branch master updated: [IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 544c60ff87 [IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)
544c60ff87 is described below
commit 544c60ff877ecc0c7e7b2924814572adfacdc228
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Aug 18 20:28:58 2022 +0800
[IOTDB-4092] Protecting Region creation process by adding CreateRegionGroupsProcedure (#7006)
---
.../async/datanode/AsyncDataNodeClientPool.java | 68 +++++++----
.../sync/datanode/SyncDataNodeClientPool.java | 13 +-
.../consensus/request/ConfigPhysicalPlan.java | 6 +-
.../consensus/request/ConfigPhysicalPlanType.java | 2 +-
.../request/write/CreateRegionGroupsPlan.java | 11 +-
...egionsPlan.java => DeleteRegionGroupsPlan.java} | 48 ++++----
.../iotdb/confignode/manager/ConfigManager.java | 17 +--
.../iotdb/confignode/manager/ConsensusManager.java | 26 ++++
.../iotdb/confignode/manager/PartitionManager.java | 67 ++++++----
.../iotdb/confignode/manager/ProcedureManager.java | 57 +++++++--
.../iotdb/confignode/manager/load/LoadManager.java | 28 ++---
.../persistence/executor/ConfigPlanExecutor.java | 63 +++++-----
.../persistence/partition/PartitionInfo.java | 47 +++++--
.../partition/StorageGroupPartitionTable.java | 9 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 50 ++++++++
.../impl/CreateRegionGroupsProcedure.java | 135 +++++++++++++++++++++
.../procedure/state/CreateRegionGroupsState.java | 27 +++++
.../procedure/store/ProcedureFactory.java | 6 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 41 +++++--
19 files changed, 538 insertions(+), 183 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 08bca91b6f..cfc0df60fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.client.async.datanode;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -50,6 +51,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -231,18 +233,18 @@ public class AsyncDataNodeClientPool {
}
/**
- * Execute CreateRegionsReq asynchronously
+ * Execute CreateRegionGroupsPlan asynchronously
*
- * @param createRegionGroupsPlan CreateRegionsReq
* @param ttlMap Map<StorageGroupName, TTL>
+ * @return Those RegionGroups that failed to create
*/
- public void createRegions(
+ public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
// Because different requests will be sent to the same node when createRegions,
// so for CreateRegions use Map<index, TDataNodeLocation>
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
int index = 0;
- // Count the datanodes to be sent
+ // Count the DataNodes to be sent
for (List<TRegionReplicaSet> regionReplicaSets :
createRegionGroupsPlan.getRegionGroupMap().values()) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
@@ -252,7 +254,7 @@ public class AsyncDataNodeClientPool {
}
}
if (dataNodeLocationMap.isEmpty()) {
- return;
+ return new HashMap<>();
}
for (int retry = 0; retry < retryNum; retry++) {
index = 0;
@@ -298,7 +300,7 @@ public class AsyncDataNodeClientPool {
retry);
break;
default:
- return;
+ break;
}
} else {
index++;
@@ -317,6 +319,43 @@ public class AsyncDataNodeClientPool {
break;
}
}
+
+ // Filter RegionGroups that weren't created successfully
+ index = 0;
+ Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
+ for (List<TRegionReplicaSet> regionReplicaSets :
+ createRegionGroupsPlan.getRegionGroupMap().values()) {
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ if (dataNodeLocationMap.containsKey(index)) {
+ failedRegions
+ .computeIfAbsent(
+ regionReplicaSet.getRegionId(),
+ empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
+ .addToDataNodeLocations(dataNodeLocation);
+ }
+ index += 1;
+ }
+ }
+ }
+ return failedRegions;
+ }
+
+ private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet) {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ return req;
+ }
+
+ private TCreateDataRegionReq genCreateDataRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ req.setTtl(TTL);
+ return req;
}
/**
@@ -341,23 +380,6 @@ public class AsyncDataNodeClientPool {
}
}
- private TCreateSchemaRegionReq genCreateSchemaRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet) {
- TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- return req;
- }
-
- private TCreateDataRegionReq genCreateDataRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
- TCreateDataRegionReq req = new TCreateDataRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- req.setTtl(TTL);
- return req;
- }
-
/**
* Always call this interface when a DataNode is restarted or removed
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 792ffd4640..85c21f56e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -134,14 +134,21 @@ public class SyncDataNodeClientPool {
List<TConsensusGroupId> regionIds,
Set<TRegionReplicaSet> deletedRegionSet) {
for (TConsensusGroupId regionId : regionIds) {
- LOGGER.debug("Delete region {} ", regionId);
+ LOGGER.info("Try to delete RegionReplica: {} on DataNode: {}", regionId, endPoint);
final TSStatus status =
sendSyncRequestToDataNodeWithRetry(
endPoint, regionId, DataNodeRequestType.DELETE_REGIONS);
+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("DELETE Region {} successfully", regionId);
- deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
+ LOGGER.info("Delete RegionReplica: {} on DataNode: {} successfully", regionId, endPoint);
+ } else {
+ LOGGER.warn(
+ "Failed to delete RegionReplica: {} on DataNode: {}. You might need to delete it manually",
+ regionId,
+ endPoint);
}
+
+ deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 92b03b8f1f..701fcdf67f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
@@ -143,8 +143,8 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case CreateRegionGroups:
req = new CreateRegionGroupsPlan();
break;
- case DeleteRegions:
- req = new DeleteRegionsPlan();
+ case DeleteRegionGroups:
+ req = new DeleteRegionGroupsPlan();
break;
case GetSchemaPartition:
req = new GetSchemaPartitionPlan();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index aa08063bd5..c579f1d3f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -32,7 +32,7 @@ public enum ConfigPhysicalPlanType {
GetStorageGroup,
CountStorageGroup,
CreateRegionGroups,
- DeleteRegions,
+ DeleteRegionGroups,
GetSchemaPartition,
CreateSchemaPartition,
GetOrCreateSchemaPartition,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
index fa0d288d72..ca7e3fde39 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
@@ -28,21 +28,26 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.TreeMap;
/** Create regions for specific StorageGroups */
public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
// Map<StorageGroupName, List<TRegionReplicaSet>>
- private final Map<String, List<TRegionReplicaSet>> regionGroupMap;
+ protected final Map<String, List<TRegionReplicaSet>> regionGroupMap;
public CreateRegionGroupsPlan() {
super(ConfigPhysicalPlanType.CreateRegionGroups);
- this.regionGroupMap = new TreeMap<>();
+ this.regionGroupMap = new HashMap<>();
+ }
+
+ public CreateRegionGroupsPlan(ConfigPhysicalPlanType type) {
+ super(type);
+ this.regionGroupMap = new HashMap<>();
}
public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
similarity index 55%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
index 83a00bc58c..f1a4da0332 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteRegionGroupsPlan.java
@@ -18,62 +18,60 @@
*/
package org.apache.iotdb.confignode.consensus.request.write;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class DeleteRegionsPlan extends ConfigPhysicalPlan {
+public class DeleteRegionGroupsPlan extends CreateRegionGroupsPlan {
- private final Map<String, List<TConsensusGroupId>> deleteRegionMap;
+ boolean needsDeleteInPartitionTable = true;
- public DeleteRegionsPlan() {
- super(ConfigPhysicalPlanType.DeleteRegions);
- this.deleteRegionMap = new HashMap<>();
+ public DeleteRegionGroupsPlan() {
+ super(ConfigPhysicalPlanType.DeleteRegionGroups);
}
- public void addDeleteRegion(String name, TConsensusGroupId consensusGroupId) {
- deleteRegionMap.computeIfAbsent(name, empty -> new ArrayList<>()).add(consensusGroupId);
+ public boolean isNeedsDeleteInPartitionTable() {
+ return needsDeleteInPartitionTable;
}
- public Map<String, List<TConsensusGroupId>> getDeleteRegionMap() {
- return deleteRegionMap;
+ public void setNeedsDeleteInPartitionTable(boolean needsDeleteInPartitionTable) {
+ this.needsDeleteInPartitionTable = needsDeleteInPartitionTable;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.DeleteRegions.ordinal());
+ stream.writeInt(ConfigPhysicalPlanType.DeleteRegionGroups.ordinal());
- stream.writeInt(deleteRegionMap.size());
- for (Map.Entry<String, List<TConsensusGroupId>> consensusGroupIdsEntry :
- deleteRegionMap.entrySet()) {
- BasicStructureSerDeUtil.write(consensusGroupIdsEntry.getKey(), stream);
- stream.writeInt(consensusGroupIdsEntry.getValue().size());
- for (TConsensusGroupId consensusGroupId : consensusGroupIdsEntry.getValue()) {
- ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream);
+ stream.writeByte(needsDeleteInPartitionTable ? 1 : 0);
+ stream.writeInt(regionGroupMap.size());
+ for (Map.Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
+ BasicStructureSerDeUtil.write(entry.getKey(), stream);
+ stream.writeInt(entry.getValue().size());
+ for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+ ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream);
}
}
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ needsDeleteInPartitionTable = buffer.get() > 0;
int length = buffer.getInt();
for (int i = 0; i < length; i++) {
String name = BasicStructureSerDeUtil.readString(buffer);
- deleteRegionMap.put(name, new ArrayList<>());
+ regionGroupMap.put(name, new ArrayList<>());
int regionNum = buffer.getInt();
for (int j = 0; j < regionNum; j++) {
- deleteRegionMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+ regionGroupMap.get(name).add(ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer));
}
}
}
@@ -82,12 +80,12 @@ public class DeleteRegionsPlan extends ConfigPhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- DeleteRegionsPlan that = (DeleteRegionsPlan) o;
- return deleteRegionMap.equals(that.deleteRegionMap);
+ DeleteRegionGroupsPlan that = (DeleteRegionGroupsPlan) o;
+ return regionGroupMap.equals(that.regionGroupMap);
}
@Override
public int hashCode() {
- return Objects.hash(deleteRegionMap);
+ return Objects.hash(regionGroupMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8a825ca760..295cad7127 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -616,22 +616,7 @@ public class ConfigManager implements IManager {
}
private TSStatus confirmLeader() {
- TSStatus result = new TSStatus();
-
- if (getConsensusManager().isLeader()) {
- return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } else {
- result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- result.setMessage(
- "The current ConfigNode is not leader, please redirect to a new ConfigNode.");
-
- TConfigNodeLocation leaderLocation = consensusManager.getLeader();
- if (leaderLocation != null) {
- result.setRedirectNode(leaderLocation.getInternalEndPoint());
- }
-
- return result;
- }
+ return getConsensusManager().confirmLeader();
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index f3d57f31ff..3ba78e601d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -36,6 +37,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,6 +204,30 @@ public class ConsensusManager {
return null;
}
+ /**
+ * Confirm the current ConfigNode's leadership
+ *
+ * @return SUCCESS_STATUS if the current ConfigNode is leader, NEED_REDIRECTION otherwise
+ */
+ public TSStatus confirmLeader() {
+ TSStatus result = new TSStatus();
+
+ if (isLeader()) {
+ return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else {
+ result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ result.setMessage(
+ "The current ConfigNode is not leader, please redirect to a new ConfigNode.");
+
+ TConfigNodeLocation leaderLocation = getLeader();
+ if (leaderLocation != null) {
+ result.setRedirectNode(leaderLocation.getInternalEndPoint());
+ }
+
+ return result;
+ }
+ }
+
public ConsensusGroupId getConsensusGroupId() {
return consensusGroupId;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index c66e9a72ae..6eba977594 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
@@ -54,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -177,13 +179,22 @@ public class PartitionManager {
return resp;
}
- // Allocate SchemaPartitions
- Map<String, SchemaPartitionTable> assignedSchemaPartition =
- getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
- // Cache allocating result
- CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
- createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
- getConsensusManager().write(createPlan);
+ status = getConsensusManager().confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Here we check the leadership second time
+ // since the RegionGroup creating process might take some time
+ resp.setStatus(status);
+ return resp;
+ } else {
+ // Allocate SchemaPartitions only if
+ // the current ConfigNode still holds its leadership
+ Map<String, SchemaPartitionTable> assignedSchemaPartition =
+ getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ // Cache allocating result
+ CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+ createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
+ getConsensusManager().write(createPlan);
+ }
}
return getSchemaPartition(req);
@@ -242,13 +253,22 @@ public class PartitionManager {
return resp;
}
- // Allocate DataPartitions
- Map<String, DataPartitionTable> assignedDataPartition =
- getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
- // Cache allocating result
- CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
- createPlan.setAssignedDataPartition(assignedDataPartition);
- getConsensusManager().write(createPlan);
+ status = getConsensusManager().confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Here we check the leadership second time
+ // since the RegionGroup creating process might take some time
+ resp.setStatus(status);
+ return resp;
+ } else {
+ // Allocate DataPartitions only if
+ // the current ConfigNode still holds its leadership
+ Map<String, DataPartitionTable> assignedDataPartition =
+ getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ // Cache allocating result
+ CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ createPlan.setAssignedDataPartition(assignedDataPartition);
+ getConsensusManager().write(createPlan);
+ }
}
return getDataPartition(req);
@@ -316,13 +336,13 @@ public class PartitionManager {
}
}
- // TODO: Use procedure to protect the following process
if (!allotmentMap.isEmpty()) {
- // Do Region allocation and creation for StorageGroups based on the allotment
- getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+ CreateRegionGroupsPlan createRegionGroupsPlan =
+ getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
+ result = getProcedureManager().createRegionGroups(createRegionGroupsPlan);
+ } else {
+ result = RpcUtils.SUCCESS_STATUS;
}
-
- result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (NotEnoughDataNodeException e) {
LOGGER.error("ConfigNode failed to extend Region because there are not enough DataNodes");
result.setCode(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
@@ -467,10 +487,7 @@ public class PartitionManager {
return partitionInfo.getRegionStorageGroup(regionId);
}
- /**
- * Called by {@link PartitionManager#regionCleaner} Delete regions of logical deleted storage
- * groups periodically.
- */
+ /** Called by {@link PartitionManager#regionCleaner} Delete RegionGroups periodically. */
public void clearDeletedRegions() {
if (getConsensusManager().isLeader()) {
final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
@@ -565,4 +582,8 @@ public class PartitionManager {
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
+
+ private ProcedureManager getProcedureManager() {
+ return configManager.getProcedureManager();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index addbeb65bc..61027c1343 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -30,6 +31,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
@@ -43,11 +45,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -98,15 +102,15 @@ public class ProcedureManager {
}
public TSStatus deleteStorageGroups(ArrayList<TStorageGroupSchema> deleteSgSchemaList) {
- List<Long> procIdList = new ArrayList<>();
+ List<Long> procedureIds = new ArrayList<>();
for (TStorageGroupSchema storageGroupSchema : deleteSgSchemaList) {
DeleteStorageGroupProcedure deleteStorageGroupProcedure =
new DeleteStorageGroupProcedure(storageGroupSchema);
- long procId = this.executor.submitProcedure(deleteStorageGroupProcedure);
- procIdList.add(procId);
+ long procedureId = this.executor.submitProcedure(deleteStorageGroupProcedure);
+ procedureIds.add(procedureId);
}
List<TSStatus> procedureStatus = new ArrayList<>();
- boolean isSucceed = getProcedureStatus(this.executor, procIdList, procedureStatus);
+ boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus);
// clear the previously deleted regions
final PartitionManager partitionManager = getConfigManager().getPartitionManager();
partitionManager.getRegionCleaner().submit(partitionManager::clearDeletedRegions);
@@ -146,24 +150,51 @@ public class ProcedureManager {
return true;
}
- private static boolean getProcedureStatus(
- ProcedureExecutor executor, List<Long> procIds, List<TSStatus> statusList) {
+ /**
+ * Generate CreateRegionGroupsProcedure and wait for it finished
+ *
+ * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
+ */
+ public TSStatus createRegionGroups(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ long procedureId =
+ executor.submitProcedure(new CreateRegionGroupsProcedure(createRegionGroupsPlan));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ }
+
+ /**
+ * Waiting until the specific procedures finished
+ *
+ * @param procedureIds The specific procedures' index
+ * @param statusList The corresponding running results of these procedures
+ * @return True if all Procedures finished successfully, false otherwise
+ */
+ private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus> statusList) {
boolean isSucceed = true;
- for (long procId : procIds) {
- long startTimeForProcId = System.currentTimeMillis();
+ for (long procedureId : procedureIds) {
+ long startTimeForCurrentProcedure = System.currentTimeMillis();
while (executor.isRunning()
- && !executor.isFinished(procId)
- && TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeForProcId)
+ && !executor.isFinished(procedureId)
+ && TimeUnit.MILLISECONDS.toSeconds(
+ System.currentTimeMillis() - startTimeForCurrentProcedure)
< procedureWaitTimeOut) {
sleepWithoutInterrupt(procedureWaitRetryTimeout);
}
- Procedure finishedProc = executor.getResultOrProcedure(procId);
- if (finishedProc.isSuccess()) {
+ Procedure<ConfigNodeProcedureEnv> finishedProcedure =
+ executor.getResultOrProcedure(procedureId);
+ if (finishedProcedure.isSuccess()) {
statusList.add(StatusUtils.OK);
} else {
statusList.add(
StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(
- finishedProc.getException().getMessage()));
+ finishedProcedure.getException().getMessage()));
isSucceed = false;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index abebafa3c8..27b0a14ef5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -98,31 +97,18 @@ public class LoadManager {
}
/**
- * Allocate and create Regions for each StorageGroup.
+ * Generate an optimal CreateRegionGroupsPlan
*
* @param allotmentMap Map<StorageGroupName, Region allotment>
- * @param consensusGroupType TConsensusGroupType of Region to be allocated
+ * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
+ * @return CreateRegionGroupsPlan
+ * @throws NotEnoughDataNodeException If there are not enough DataNodes
+ * @throws StorageGroupNotExistsException If some specific StorageGroups don't exist
*/
- public void doRegionCreation(
+ public CreateRegionGroupsPlan allocateRegionGroups(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- CreateRegionGroupsPlan createRegionGroupsPlan =
- regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
-
- // TODO: Use procedure to protect the following process
- // Create Regions on DataNodes
- Map<String, Long> ttlMap = new HashMap<>();
- for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
- ttlMap.put(
- storageGroup,
- getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
- }
- AsyncDataNodeClientPool.getInstance().createRegions(createRegionGroupsPlan, ttlMap);
-
- // Persist the allocation result
- getConsensusManager().write(createRegionGroupsPlan);
- // Broadcast the latest RegionRouteMap
- broadcastLatestRegionRouteMap();
+ return regionBalancer.genRegionsAllocationPlan(allotmentMap, consensusGroupType);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b37baa8f66..83bd5c38f4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
@@ -157,44 +158,50 @@ public class ConfigPlanExecutor {
}
}
- public TSStatus executeNonQueryPlan(ConfigPhysicalPlan req)
+ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
throws UnknownPhysicalPlanTypeException, AuthException {
- switch (req.getType()) {
+ switch (physicalPlan.getType()) {
case RegisterDataNode:
- return nodeInfo.registerDataNode((RegisterDataNodePlan) req);
+ return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
case RemoveDataNode:
- return nodeInfo.removeDataNode((RemoveDataNodePlan) req);
+ return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
case SetStorageGroup:
- TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) req);
+ TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
- return partitionInfo.setStorageGroup((SetStorageGroupPlan) req);
+ return partitionInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
case AdjustMaxRegionGroupCount:
- return clusterSchemaInfo.adjustMaxRegionGroupCount((AdjustMaxRegionGroupCountPlan) req);
+ return clusterSchemaInfo.adjustMaxRegionGroupCount(
+ (AdjustMaxRegionGroupCountPlan) physicalPlan);
case DeleteStorageGroup:
- partitionInfo.deleteStorageGroup((DeleteStorageGroupPlan) req);
- return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupPlan) req);
+ partitionInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
+ return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
case PreDeleteStorageGroup:
- return partitionInfo.preDeleteStorageGroup((PreDeleteStorageGroupPlan) req);
+ return partitionInfo.preDeleteStorageGroup((PreDeleteStorageGroupPlan) physicalPlan);
case SetTTL:
- return clusterSchemaInfo.setTTL((SetTTLPlan) req);
+ return clusterSchemaInfo.setTTL((SetTTLPlan) physicalPlan);
case SetSchemaReplicationFactor:
- return clusterSchemaInfo.setSchemaReplicationFactor((SetSchemaReplicationFactorPlan) req);
+ return clusterSchemaInfo.setSchemaReplicationFactor(
+ (SetSchemaReplicationFactorPlan) physicalPlan);
case SetDataReplicationFactor:
- return clusterSchemaInfo.setDataReplicationFactor((SetDataReplicationFactorPlan) req);
+ return clusterSchemaInfo.setDataReplicationFactor(
+ (SetDataReplicationFactorPlan) physicalPlan);
case SetTimePartitionInterval:
- return clusterSchemaInfo.setTimePartitionInterval((SetTimePartitionIntervalPlan) req);
+ return clusterSchemaInfo.setTimePartitionInterval(
+ (SetTimePartitionIntervalPlan) physicalPlan);
case CreateRegionGroups:
- return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) req);
+ return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
+ case DeleteRegionGroups:
+ return partitionInfo.deleteRegionGroups((DeleteRegionGroupsPlan) physicalPlan);
case CreateSchemaPartition:
- return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) req);
+ return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) physicalPlan);
case CreateDataPartition:
- return partitionInfo.createDataPartition((CreateDataPartitionPlan) req);
+ return partitionInfo.createDataPartition((CreateDataPartitionPlan) physicalPlan);
case UpdateProcedure:
- return procedureInfo.updateProcedure((UpdateProcedurePlan) req);
+ return procedureInfo.updateProcedure((UpdateProcedurePlan) physicalPlan);
case DeleteProcedure:
- return procedureInfo.deleteProcedure((DeleteProcedurePlan) req);
+ return procedureInfo.deleteProcedure((DeleteProcedurePlan) physicalPlan);
case CreateUser:
case CreateRole:
case DropUser:
@@ -206,23 +213,23 @@ public class ConfigPlanExecutor {
case RevokeRole:
case RevokeRoleFromUser:
case UpdateUser:
- return authorInfo.authorNonQuery((AuthorPlan) req);
+ return authorInfo.authorNonQuery((AuthorPlan) physicalPlan);
case ApplyConfigNode:
- return nodeInfo.applyConfigNode((ApplyConfigNodePlan) req);
+ return nodeInfo.applyConfigNode((ApplyConfigNodePlan) physicalPlan);
case RemoveConfigNode:
- return nodeInfo.removeConfigNode((RemoveConfigNodePlan) req);
+ return nodeInfo.removeConfigNode((RemoveConfigNodePlan) physicalPlan);
case CreateFunction:
- return udfInfo.createFunction((CreateFunctionPlan) req);
+ return udfInfo.createFunction((CreateFunctionPlan) physicalPlan);
case DropFunction:
- return udfInfo.dropFunction((DropFunctionPlan) req);
+ return udfInfo.dropFunction((DropFunctionPlan) physicalPlan);
case CreateSchemaTemplate:
- return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) req);
+ return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
case UpdateRegionLocation:
- return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) req);
+ return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) physicalPlan);
case SetSchemaTemplate:
- return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) req);
+ return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) physicalPlan);
default:
- throw new UnknownPhysicalPlanTypeException(req.getType());
+ throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 7aa02188e7..2f9b23a882 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupPlan;
@@ -103,9 +104,6 @@ public class PartitionInfo implements SnapshotProcessor {
public PartitionInfo() {
this.storageGroupPartitionTables = new ConcurrentHashMap<>();
this.nextRegionGroupId = new AtomicInteger(-1);
-
- // Ensure that the PartitionTables of the StorageGroups who've been logically deleted
- // are unreadable and un-writable
// For RegionCleaner
this.deletedRegionSet = Collections.synchronizedSet(new HashSet<>());
}
@@ -195,6 +193,42 @@ public class PartitionInfo implements SnapshotProcessor {
return result;
}
+ /**
+ * Synchronously delete RegionGroups in PartitionTable and asynchronously delete RegionReplica on
+ * remote DataNodes
+ *
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus deleteRegionGroups(DeleteRegionGroupsPlan deleteRegionGroupsPlan) {
+ // Delete RegionGroups' in PartitionTable if necessary
+ if (deleteRegionGroupsPlan.isNeedsDeleteInPartitionTable()) {
+ deleteRegionGroupsPlan
+ .getRegionGroupMap()
+ .forEach(
+ (storageGroup, deleteRegionGroups) -> {
+ if (isStorageGroupExisted(storageGroup)) {
+ storageGroupPartitionTables
+ .get(storageGroup)
+ .deleteRegionGroups(deleteRegionGroups);
+ }
+ });
+ }
+
+ // Delete RegionReplicaSets on remote DataNodes asynchronously
+ synchronized (deletedRegionSet) {
+ deleteRegionGroupsPlan.getRegionGroupMap().values().forEach(deletedRegionSet::addAll);
+ }
+
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ /** @return The Regions that should be deleted among the DataNodes */
+ public Set<TRegionReplicaSet> getDeletedRegionSet() {
+ synchronized (deletedRegionSet) {
+ return deletedRegionSet;
+ }
+ }
+
/**
* Thread-safely pre-delete the specific StorageGroup
*
@@ -244,13 +278,6 @@ public class PartitionInfo implements SnapshotProcessor {
}
}
- /** @return The Regions that should be deleted among the DataNodes */
- public Set<TRegionReplicaSet> getDeletedRegionSet() {
- synchronized (deletedRegionSet) {
- return deletedRegionSet;
- }
- }
-
/**
* Thread-safely get SchemaPartition
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 37690984b0..1cffbb9b72 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -148,6 +148,15 @@ public class StorageGroupPartitionTable {
replicaSet -> regionGroupMap.put(replicaSet.getRegionId(), new RegionGroup(replicaSet)));
}
+ /**
+ * Delete RegionGroups' cache
+ *
+ * @param replicaSets List<TRegionReplicaSet>
+ */
+ public void deleteRegionGroups(List<TRegionReplicaSet> replicaSets) {
+ replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
+ }
+
/** @return All Regions' RegionReplicaSet within one StorageGroup */
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 9daa2d4ea7..b54f9132e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -20,20 +20,27 @@
package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
import org.apache.iotdb.confignode.exception.AddPeerException;
+import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
+import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -48,7 +55,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
public class ConfigNodeProcedureEnv {
@@ -288,6 +297,39 @@ public class ConfigNodeProcedureEnv {
configManager.getNodeManager().getRegisteredConfigNodes());
}
+ /**
+ * Broadcast the CreateRegionGroupsPlan
+ *
+ * @return Those RegionGroups that failed to create
+ */
+ public Map<TConsensusGroupId, TRegionReplicaSet> doRegionCreation(
+ CreateRegionGroupsPlan createRegionGroupsPlan) {
+ Map<String, Long> ttlMap = new HashMap<>();
+ for (String storageGroup : createRegionGroupsPlan.getRegionGroupMap().keySet()) {
+ try {
+ ttlMap.put(
+ storageGroup,
+ getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
+ } catch (StorageGroupNotExistsException e) {
+ // Notice: This line will never
+ LOG.error("StorageGroup doesn't exist", e);
+ }
+ }
+ return AsyncDataNodeClientPool.getInstance().createRegionGroups(createRegionGroupsPlan, ttlMap);
+ }
+
+ public void persistAndBroadcastRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ // Persist the allocation result
+ getConsensusManager().write(createRegionGroupsPlan);
+ // Broadcast the latest RegionRouteMap
+ getLoadManager().broadcastLatestRegionRouteMap();
+ }
+
+ /** Submit the RegionReplicas to the RegionCleaner when there are creation failures */
+ public void submitFailedRegionReplicas(DeleteRegionGroupsPlan deleteRegionGroupsPlan) {
+ getConsensusManager().write(deleteRegionGroupsPlan);
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
@@ -311,4 +353,12 @@ public class ConfigNodeProcedureEnv {
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
+
+ private LoadManager getLoadManager() {
+ return configManager.getLoadManager();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
new file mode 100644
index 0000000000..32877571bb
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
+
+import java.util.Map;
+
+public class CreateRegionGroupsProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
+
+ private final CreateRegionGroupsPlan createRegionGroupsPlan;
+ // Map<TConsensusGroupId, Failed RegionReplicas>
+ private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions;
+
+ public CreateRegionGroupsProcedure(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ this.createRegionGroupsPlan = createRegionGroupsPlan;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) {
+ switch (state) {
+ case CREATE_REGION_GROUPS_PREPARE:
+ setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS);
+ break;
+ case CREATE_REGION_GROUPS:
+ failedRegions = env.doRegionCreation(createRegionGroupsPlan);
+ setNextState(CreateRegionGroupsState.PERSIST_AND_BROADCAST);
+ break;
+ case PERSIST_AND_BROADCAST:
+ // Filter those RegionGroups that created successfully
+ CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+ createRegionGroupsPlan
+ .getRegionGroupMap()
+ .forEach(
+ (storageGroup, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ if (!failedRegions.containsKey(regionReplicaSet.getRegionId())) {
+ persistPlan.addRegionGroup(storageGroup, regionReplicaSet);
+ }
+ }));
+ env.persistAndBroadcastRegionGroup(persistPlan);
+ setNextState(
+ failedRegions.size() > 0
+ ? CreateRegionGroupsState.DELETE_FAILED_REGION_GROUPS
+ : CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
+ break;
+ case DELETE_FAILED_REGION_GROUPS:
+ DeleteRegionGroupsPlan deletePlan = new DeleteRegionGroupsPlan();
+ // We don't need to wipe the PartitionTable here
+ // since the failed RegionGroups are not recorded
+ deletePlan.setNeedsDeleteInPartitionTable(false);
+ createRegionGroupsPlan
+ .getRegionGroupMap()
+ .forEach(
+ (storageGroup, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ if (failedRegions.containsKey(regionReplicaSet.getRegionId())) {
+ TRegionReplicaSet failedReplicaSet =
+ failedRegions.get(regionReplicaSet.getRegionId());
+ TRegionReplicaSet redundantReplicaSet =
+ new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId());
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ if (!failedReplicaSet
+ .getDataNodeLocations()
+ .contains(dataNodeLocation)) {
+ redundantReplicaSet.addToDataNodeLocations(
+ dataNodeLocation);
+ }
+ });
+ deletePlan.addRegionGroup(storageGroup, redundantReplicaSet);
+ }
+ }));
+ env.submitFailedRegionReplicas(deletePlan);
+ setFailure(
+ new ProcedureException(
+ "There are some RegionGroups failed to create, please check former logs in ConfigNode-leader."));
+ return Flow.NO_MORE_STATE;
+ case CREATE_REGION_GROUPS_FINISH:
+ return Flow.NO_MORE_STATE;
+ }
+
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ ConfigNodeProcedureEnv configNodeProcedureEnv,
+ CreateRegionGroupsState createRegionGroupsState) {
+ // Do nothing
+ }
+
+ @Override
+ protected CreateRegionGroupsState getState(int stateId) {
+ return CreateRegionGroupsState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(CreateRegionGroupsState createRegionGroupsState) {
+ return createRegionGroupsState.ordinal();
+ }
+
+ @Override
+ protected CreateRegionGroupsState getInitialState() {
+ return CreateRegionGroupsState.CREATE_REGION_GROUPS;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
new file mode 100644
index 0000000000..35855420a0
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum CreateRegionGroupsState {
+ CREATE_REGION_GROUPS_PREPARE,
+ CREATE_REGION_GROUPS,
+ PERSIST_AND_BROADCAST,
+ DELETE_FAILED_REGION_GROUPS,
+ CREATE_REGION_GROUPS_FINISH
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index dd28aebfdb..59f0eedaa9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
@@ -73,6 +74,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.REMOVE_DATA_NODE_PROCEDURE;
} else if (procedure instanceof RegionMigrateProcedure) {
return ProcedureType.REGION_MIGRATE_PROCEDURE;
+ } else if (procedure instanceof CreateRegionGroupsProcedure) {
+ return ProcedureType.CREATE_REGION_GROUPS;
}
return null;
}
@@ -82,7 +85,8 @@ public class ProcedureFactory implements IProcedureFactory {
ADD_CONFIG_NODE_PROCEDURE,
REMOVE_CONFIG_NODE_PROCEDURE,
REMOVE_DATA_NODE_PROCEDURE,
- REGION_MIGRATE_PROCEDURE
+ REGION_MIGRATE_PROCEDURE,
+ CREATE_REGION_GROUPS
}
private static class ProcedureFactoryHolder {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 64a2ec3472..32429cec1f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -53,7 +53,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPl
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedurePlan;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsPlan;
+import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -235,12 +235,23 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void DeleteRegionsPlanTest() throws IOException {
- DeleteRegionsPlan req0 = new DeleteRegionsPlan();
- req0.addDeleteRegion("sg", new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
- req0.addDeleteRegion("sg", new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
+ TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(0);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
- DeleteRegionsPlan req1 =
- (DeleteRegionsPlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
+ DeleteRegionGroupsPlan req0 = new DeleteRegionGroupsPlan();
+ req0.setNeedsDeleteInPartitionTable(false);
+ TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
+ dataRegionSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+ dataRegionSet.setDataNodeLocations(Collections.singletonList(dataNodeLocation));
+ req0.addRegionGroup("root.sg0", dataRegionSet);
+
+ DeleteRegionGroupsPlan req1 =
+ (DeleteRegionGroupsPlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
Assert.assertEquals(req0, req1);
}
@@ -691,16 +702,20 @@ public class ConfigPhysicalPlanSerDeTest {
private CreateSchemaTemplateStatement newCreateSchemaTemplateStatement(String name) {
List<List<String>> measurements =
Arrays.asList(
- Arrays.asList(name + "_" + "temperature"), Arrays.asList(name + "_" + "status"));
+ Collections.singletonList(name + "_" + "temperature"),
+ Collections.singletonList(name + "_" + "status"));
List<List<TSDataType>> dataTypes =
- Arrays.asList(Arrays.asList(TSDataType.FLOAT), Arrays.asList(TSDataType.BOOLEAN));
+ Arrays.asList(
+ Collections.singletonList(TSDataType.FLOAT),
+ Collections.singletonList(TSDataType.BOOLEAN));
List<List<TSEncoding>> encodings =
- Arrays.asList(Arrays.asList(TSEncoding.RLE), Arrays.asList(TSEncoding.PLAIN));
+ Arrays.asList(
+ Collections.singletonList(TSEncoding.RLE), Collections.singletonList(TSEncoding.PLAIN));
List<List<CompressionType>> compressors =
- Arrays.asList(Arrays.asList(CompressionType.SNAPPY), Arrays.asList(CompressionType.SNAPPY));
- CreateSchemaTemplateStatement createSchemaTemplateStatement =
- new CreateSchemaTemplateStatement(name, measurements, dataTypes, encodings, compressors);
- return createSchemaTemplateStatement;
+ Arrays.asList(
+ Collections.singletonList(CompressionType.SNAPPY),
+ Collections.singletonList(CompressionType.SNAPPY));
+ return new CreateSchemaTemplateStatement(name, measurements, dataTypes, encodings, compressors);
}
@Test