You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 01:50:00 UTC
[iotdb] branch master updated: [IOTDB-3183] Cancel the singleton mode in the ConfigNode's persistence module (#5913)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7942b02c47 [IOTDB-3183] Cancel the singleton mode in the ConfigNode's persistence module (#5913)
7942b02c47 is described below
commit 7942b02c47317e70824764d6181e02857bdaac12
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon May 16 09:49:56 2022 +0800
[IOTDB-3183] Cancel the singleton mode in the ConfigNode's persistence module (#5913)
---
.../statemachine/PartitionRegionStateMachine.java | 6 +-
.../confignode/manager/ClusterSchemaManager.java | 18 +++++-
.../iotdb/confignode/manager/ConfigManager.java | 35 ++++++++---
.../iotdb/confignode/manager/ConsensusManager.java | 10 ++--
.../apache/iotdb/confignode/manager/Manager.java | 4 +-
.../iotdb/confignode/manager/NodeManager.java | 15 +++--
.../iotdb/confignode/manager/PartitionManager.java | 16 ++++-
.../confignode/manager/PermissionManager.java | 8 ++-
.../iotdb/confignode/manager/ProcedureManager.java | 20 ++++---
.../iotdb/confignode/persistence/AuthorInfo.java | 12 ----
.../confignode/persistence/ClusterSchemaInfo.java | 68 ++++++++++------------
.../iotdb/confignode/persistence/NodeInfo.java | 15 +----
.../confignode/persistence/PartitionInfo.java | 30 ++++------
.../confignode/persistence/ProcedureInfo.java | 13 -----
.../executor/ConfigRequestExecutor.java | 18 ++++--
.../confignode/procedure/ConfigProcedureStore.java | 9 ++-
.../procedure/DeleteStorageGroupProcedure.java | 18 ++++--
.../procedure/DeleteStorageGroupState.java | 2 +-
.../hash/DeviceGroupHashExecutorManualTest.java | 3 +-
.../confignode/persistence/AuthorInfoTest.java | 2 +-
.../persistence/ClusterSchemaInfoTest.java | 2 +-
.../iotdb/confignode/persistence/NodeInfoTest.java | 2 +-
.../confignode/persistence/PartitionInfoTest.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 8 +--
24 files changed, 170 insertions(+), 166 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 1c87318a63..26a077d376 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -45,10 +45,10 @@ public class PartitionRegionStateMachine implements IStateMachine, IStateMachine
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
private final ConfigRequestExecutor executor;
private ConfigManager configManager;
- private TEndPoint currentNode;
+ private final TEndPoint currentNode;
- public PartitionRegionStateMachine(ConfigManager configManager) {
- this.executor = new ConfigRequestExecutor();
+ public PartitionRegionStateMachine(ConfigManager configManager, ConfigRequestExecutor executor) {
+ this.executor = executor;
this.configManager = configManager;
this.currentNode =
new TEndPoint()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 7c0ece7553..baf67a015a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -41,18 +41,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
/** The ClusterSchemaManager Manages cluster schema read and write requests. */
public class ClusterSchemaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaManager.class);
- private static final ClusterSchemaInfo clusterSchemaInfo = ClusterSchemaInfo.getInstance();
-
private final Manager configManager;
+ private final ClusterSchemaInfo clusterSchemaInfo;
- public ClusterSchemaManager(Manager configManager) {
+ public ClusterSchemaManager(Manager configManager, ClusterSchemaInfo clusterSchemaInfo) {
this.configManager = configManager;
+ this.clusterSchemaInfo = clusterSchemaInfo;
}
/**
@@ -106,6 +107,17 @@ public class ClusterSchemaManager {
return clusterSchemaInfo.getMatchedStorageGroupSchemaByName(storageGroup);
}
+ /**
+ * Only leader use this interface.
+ *
+ * @param rawPathList List<StorageGroupName>
+ * @return the matched StorageGroupSchemas
+ */
+ public Map<String, TStorageGroupSchema> getMatchedStorageGroupSchemasByName(
+ List<String> rawPathList) {
+ return clusterSchemaInfo.getMatchedStorageGroupSchemasByName(rawPathList);
+ }
+
public TSStatus setTTL(SetTTLReq setTTLReq) {
// TODO: Inform DataNodes
return getConsensusManager().write(setTTLReq).getStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 278859b9c4..69561b7fcd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -48,9 +48,14 @@ import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
+import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -95,13 +100,27 @@ public class ConfigManager implements Manager {
private final ProcedureManager procedureManager;
public ConfigManager() throws IOException {
- this.nodeManager = new NodeManager(this);
- this.partitionManager = new PartitionManager(this);
- this.clusterSchemaManager = new ClusterSchemaManager(this);
- this.permissionManager = new PermissionManager(this);
+ // Build the persistence module
+ NodeInfo nodeInfo = new NodeInfo();
+ ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
+ PartitionInfo partitionInfo = new PartitionInfo();
+ AuthorInfo authorInfo = new AuthorInfo();
+ ProcedureInfo procedureInfo = new ProcedureInfo();
+
+ // Build state machine and executor
+ ConfigRequestExecutor executor =
+ new ConfigRequestExecutor(
+ nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo);
+ PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor);
+
+ // Build the manager module
+ this.nodeManager = new NodeManager(this, nodeInfo);
+ this.clusterSchemaManager = new ClusterSchemaManager(this, clusterSchemaInfo);
+ this.partitionManager = new PartitionManager(this, partitionInfo);
+ this.permissionManager = new PermissionManager(this, authorInfo);
+ this.procedureManager = new ProcedureManager(this, procedureInfo);
this.loadManager = new LoadManager(this);
- this.procedureManager = new ProcedureManager(this);
- this.consensusManager = new ConsensusManager(this);
+ this.consensusManager = new ConsensusManager(stateMachine);
// We are on testing.......
if (ConfigNodeDescriptor.getInstance().getConf().isEnableHeartbeat()) {
@@ -128,7 +147,7 @@ public class ConfigManager implements Manager {
} else {
DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
dataSet.setStatus(status);
- dataSet.setConfigNodeList(NodeInfo.getInstance().getOnlineConfigNodes());
+ dataSet.setConfigNodeList(nodeManager.getOnlineConfigNodes());
return dataSet;
}
}
@@ -228,7 +247,7 @@ public class ConfigManager implements Manager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// remove wild
Map<String, TStorageGroupSchema> deleteStorageSchemaMap =
- ClusterSchemaInfo.getInstance().getDeleteStorageGroups(deletedPaths);
+ getClusterSchemaManager().getMatchedStorageGroupSchemasByName(deletedPaths);
for (Map.Entry<String, TStorageGroupSchema> storageGroupSchemaEntry :
deleteStorageSchemaMap.entrySet()) {
String sgName = storageGroupSchemaEntry.getKey();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 58ffbf2970..702f5cfa05 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -50,13 +50,11 @@ public class ConsensusManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
- private final ConfigManager configManager;
private ConsensusGroupId consensusGroupId;
private IConsensus consensusImpl;
- public ConsensusManager(ConfigManager configManager) throws IOException {
- this.configManager = configManager;
- setConsensusLayer();
+ public ConsensusManager(PartitionRegionStateMachine stateMachine) throws IOException {
+ setConsensusLayer(stateMachine);
}
public void close() throws IOException {
@@ -82,7 +80,7 @@ public class ConsensusManager {
}
/** Build ConfigNodeGroup ConsensusLayer */
- private void setConsensusLayer() throws IOException {
+ private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws IOException {
// There is only one ConfigNodeGroup
consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId());
@@ -92,7 +90,7 @@ public class ConsensusManager {
conf.getConfigNodeConsensusProtocolClass(),
new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort()),
new File(conf.getConsensusDir()),
- gid -> new PartitionRegionStateMachine(configManager))
+ gid -> stateMachine)
.orElseThrow(
() ->
new IllegalArgumentException(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 6a7fc0a35f..dbb95ca560 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -132,9 +132,9 @@ public interface Manager {
TSStatus setStorageGroup(SetStorageGroupReq setStorageGroupReq);
/**
- * Delete StorageGroup
+ * Delete StorageGroups
*
- * @param deleteStorageGroupsReq deleteStorageGroupReq
+ * @param deletedPaths List<StringPattern>
* @return status
*/
TSStatus deleteStorageGroups(List<String> deletedPaths);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b5d778e8b5..ea56c4cf09 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -47,15 +48,15 @@ public class NodeManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
- private static final NodeInfo nodeInfo = NodeInfo.getInstance();
-
private final Manager configManager;
+ private final NodeInfo nodeInfo;
/** TODO:do some operate after add node or remove node */
private final List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
- public NodeManager(Manager configManager) {
+ public NodeManager(Manager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
+ this.nodeInfo = nodeInfo;
}
private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
@@ -82,7 +83,7 @@ public class NodeManager {
public DataSet registerDataNode(RegisterDataNodeReq req) {
DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
- if (NodeInfo.getInstance().containsValue(req.getLocation())) {
+ if (nodeInfo.containsValue(req.getLocation())) {
// Reset client
AsyncDataNodeClientPool.getInstance().resetClient(req.getLocation().getInternalEndPoint());
@@ -91,7 +92,7 @@ public class NodeManager {
dataSet.setStatus(status);
} else {
// Persist DataNodeInfo
- req.getLocation().setDataNodeId(NodeInfo.getInstance().generateNextDataNodeId());
+ req.getLocation().setDataNodeId(nodeInfo.generateNextDataNodeId());
ConsensusWriteResponse resp = getConsensusManager().write(req);
dataSet.setStatus(resp.getStatus());
}
@@ -167,6 +168,10 @@ public class NodeManager {
}
}
+ public List<TConfigNodeLocation> getOnlineConfigNodes() {
+ return nodeInfo.getOnlineConfigNodes();
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index f0db7b0d05..ee6e6f3520 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -57,14 +57,14 @@ public class PartitionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
- private static final PartitionInfo partitionInfo = PartitionInfo.getInstance();
-
private final Manager configManager;
+ private final PartitionInfo partitionInfo;
private SeriesPartitionExecutor executor;
- public PartitionManager(Manager configManager) {
+ public PartitionManager(Manager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
+ this.partitionInfo = partitionInfo;
setSeriesPartitionExecutor();
}
@@ -294,6 +294,16 @@ public class PartitionManager {
return partitionInfo.generateNextRegionGroupId();
}
+ /**
+ * Only leader use this interface.
+ *
+ * @param groupIds List<TConsensusGroupId>
+ * @return RegionReplicaSet by the specific TConsensusGroupIds
+ */
+ public List<TRegionReplicaSet> getRegionReplicaSets(List<TConsensusGroupId> groupIds) {
+ return partitionInfo.getRegionReplicaSets(groupIds);
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 61ef6a614a..7cbae5a3d8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -30,9 +30,11 @@ import java.util.List;
public class PermissionManager {
private final Manager configManager;
+ private final AuthorInfo authorInfo;
- public PermissionManager(Manager configManager) {
+ public PermissionManager(Manager configManager, AuthorInfo authorInfo) {
this.configManager = configManager;
+ this.authorInfo = authorInfo;
}
/**
@@ -60,10 +62,10 @@ public class PermissionManager {
}
public TSStatus login(String username, String password) {
- return AuthorInfo.getInstance().login(username, password);
+ return authorInfo.login(username, password);
}
public TSStatus checkUserPrivileges(String username, List<String> paths, int permission) {
- return AuthorInfo.getInstance().checkUserPrivileges(username, paths, permission);
+ return authorInfo.checkUserPrivileges(username, paths, permission);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index d120a9ca04..f433e69234 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.ConfigProcedureStore;
import org.apache.iotdb.confignode.procedure.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -44,19 +45,22 @@ import java.util.concurrent.TimeUnit;
public class ProcedureManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class);
+
+ private static final ConfigNodeConf configNodeConf = ConfigNodeDescriptor.getInstance().getConf();
+
private static final int procedureWaitTimeOut = 30;
private static final int procedureWaitRetryTimeout = 250;
- private final ConfigManager configNodeManager;
+
+ private final ConfigManager configManager;
private ProcedureExecutor<ConfigNodeProcedureEnv> executor;
private ProcedureScheduler scheduler;
private IProcedureStore store;
private ConfigNodeProcedureEnv env;
- private ConfigNodeConf configNodeConf = ConfigNodeDescriptor.getInstance().getConf();
- public ProcedureManager(ConfigManager configManager) {
- this.configNodeManager = configManager;
+ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) {
+ this.configManager = configManager;
this.scheduler = new SimpleProcedureScheduler();
- this.store = new ConfigProcedureStore(configManager);
+ this.store = new ConfigProcedureStore(configManager, procedureInfo);
this.env = new ConfigNodeProcedureEnv(configManager);
this.executor = new ProcedureExecutor<>(env, store, scheduler);
}
@@ -86,7 +90,7 @@ public class ProcedureManager {
List<Long> procIdList = new ArrayList<>();
for (TStorageGroupSchema storageGroupSchema : deleteSgSchemaList) {
DeleteStorageGroupProcedure deleteStorageGroupProcedure =
- new DeleteStorageGroupProcedure(storageGroupSchema);
+ new DeleteStorageGroupProcedure(storageGroupSchema, configManager);
long procId = this.executor.submitProcedure(deleteStorageGroupProcedure);
procIdList.add(procId);
}
@@ -145,8 +149,8 @@ public class ProcedureManager {
GET-SET Region
*/
// ======================================================
- public Manager getConfigNodeManager() {
- return configNodeManager;
+ public Manager getConfigManager() {
+ return configManager;
}
public ProcedureExecutor<ConfigNodeProcedureEnv> getExecutor() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index 72704fd7da..3c8563b10f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -340,18 +340,6 @@ public class AuthorInfo implements SnapshotProcessor {
}
}
- private static class AuthorInfoHolder {
- private static final AuthorInfo INSTANCE = new AuthorInfo();
-
- private AuthorInfoHolder() {
- // empty constructor
- }
- }
-
- public static AuthorInfo getInstance() {
- return AuthorInfo.AuthorInfoHolder.INSTANCE;
- }
-
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
return authorizer.processTakeSnapshot(snapshotDir);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 04a5ef47a9..9bc8e67d59 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -77,7 +77,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
private final String snapshotFileName = "cluster_schema.bin";
- private ClusterSchemaInfo() {
+ public ClusterSchemaInfo() {
storageGroupReadWriteLock = new ReentrantReadWriteLock();
try {
@@ -136,7 +136,6 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
TStorageGroupSchema storageGroupSchema = req.getStorageGroup();
PartialPath partialPathName = new PartialPath(storageGroupSchema.getName());
mTree.deleteStorageGroup(partialPathName);
- PartitionInfo.getInstance().deleteStorageGroup(req);
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
LOGGER.warn("Storage group not exist", e);
@@ -345,27 +344,6 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
return result;
}
- /** @return All StorageGroupSchemas that matches to the specific StorageGroup patterns */
- public Map<String, TStorageGroupSchema> getDeleteStorageGroups(List<String> rawPathList) {
- Map<String, TStorageGroupSchema> schemaMap = new HashMap<>();
- storageGroupReadWriteLock.readLock().lock();
- try {
- for (String rawPath : rawPathList) {
- PartialPath patternPath = new PartialPath(rawPath);
- List<PartialPath> matchedPaths = mTree.getBelongedStorageGroups(patternPath);
- for (PartialPath path : matchedPaths) {
- schemaMap.put(
- path.getFullPath(), mTree.getStorageGroupNodeByPath(path).getStorageGroupSchema());
- }
- }
- } catch (MetadataException e) {
- LOGGER.warn("Error StorageGroup name", e);
- } finally {
- storageGroupReadWriteLock.readLock().unlock();
- }
- return schemaMap;
- }
-
/** @return True if StorageGroupInfo contains the specific StorageGroup */
public boolean containsStorageGroup(String storageName) {
boolean result;
@@ -400,6 +378,28 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
}
+ /** @return All StorageGroupSchemas that matches to the specific StorageGroup patterns */
+ public Map<String, TStorageGroupSchema> getMatchedStorageGroupSchemasByName(
+ List<String> rawPathList) {
+ Map<String, TStorageGroupSchema> schemaMap = new HashMap<>();
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ for (String rawPath : rawPathList) {
+ PartialPath patternPath = new PartialPath(rawPath);
+ List<PartialPath> matchedPaths = mTree.getBelongedStorageGroups(patternPath);
+ for (PartialPath path : matchedPaths) {
+ schemaMap.put(
+ path.getFullPath(), mTree.getStorageGroupNodeByPath(path).getStorageGroupSchema());
+ }
+ }
+ } catch (MetadataException e) {
+ LOGGER.warn("Error StorageGroup name", e);
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return schemaMap;
+ }
+
/**
* Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific StorageGroup.
*
@@ -459,7 +459,14 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
return tmpFile.renameTo(snapshotFile);
} finally {
buffer.clear();
- tmpFile.delete();
+ for (int retry = 0; retry < 5; retry++) {
+ if (tmpFile.delete()) {
+ break;
+ } else {
+ LOGGER.warn(
+ "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
+ }
+ }
storageGroupReadWriteLock.readLock().unlock();
}
}
@@ -493,17 +500,4 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
public void clear() {
mTree.clear();
}
-
- private static class StorageGroupInfoHolder {
-
- private static final ClusterSchemaInfo INSTANCE = new ClusterSchemaInfo();
-
- private StorageGroupInfoHolder() {
- // Empty constructor
- }
- }
-
- public static ClusterSchemaInfo getInstance() {
- return StorageGroupInfoHolder.INSTANCE;
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 8fe90d1c0c..343ec9e667 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -98,7 +98,7 @@ public class NodeInfo implements SnapshotProcessor {
private final String snapshotFileName = "node_info.bin";
- private NodeInfo() {
+ public NodeInfo() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.onlineConfigNodes =
@@ -407,17 +407,4 @@ public class NodeInfo implements SnapshotProcessor {
drainingDataNodes.clear();
onlineConfigNodes.clear();
}
-
- private static class DataNodeInfoPersistenceHolder {
-
- private static final NodeInfo INSTANCE = new NodeInfo();
-
- private DataNodeInfoPersistenceHolder() {
- // empty constructor
- }
- }
-
- public static NodeInfo getInstance() {
- return NodeInfo.DataNodeInfoPersistenceHolder.INSTANCE;
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index 1e36ebb832..20a701ef34 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -92,7 +92,7 @@ public class PartitionInfo implements SnapshotProcessor {
private final String snapshotFileName = "partition_info.bin";
- private PartitionInfo() {
+ public PartitionInfo() {
this.regionReadWriteLock = new ReentrantReadWriteLock();
this.regionMap = new HashMap<>();
@@ -171,12 +171,11 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Delete Regions
+ * Delete StorageGroup
*
* @param req DeleteRegionsReq
- * @return SUCCESS_STATUS
*/
- public TSStatus deleteStorageGroup(DeleteStorageGroupReq req) {
+ public void deleteStorageGroup(DeleteStorageGroupReq req) {
TStorageGroupSchema storageGroupSchema = req.getStorageGroup();
List<TConsensusGroupId> dataRegionGroupIds = storageGroupSchema.getDataRegionGroupIds();
List<TConsensusGroupId> schemaRegionGroupIds = storageGroupSchema.getSchemaRegionGroupIds();
@@ -190,7 +189,6 @@ public class PartitionInfo implements SnapshotProcessor {
deleteRegions(deleteRegionsReq);
deleteDataPartitionMapByStorageGroup(storageGroupSchema.getName());
deleteSchemaPartitionMapByStorageGroup(storageGroupSchema.getName());
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
@@ -416,7 +414,14 @@ public class PartitionInfo implements SnapshotProcessor {
unlockAllRead();
byteBuffer.clear();
// with or without success, delete temporary files anyway
- tmpFile.delete();
+ for (int retry = 0; retry < 5; retry++) {
+ if (tmpFile.delete()) {
+ break;
+ } else {
+ LOGGER.warn(
+ "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
+ }
+ }
}
}
@@ -529,17 +534,4 @@ public class PartitionInfo implements SnapshotProcessor {
dataPartition.getDataPartitionMap().clear();
}
}
-
- private static class PartitionInfoHolder {
-
- private static final PartitionInfo INSTANCE = new PartitionInfo();
-
- private PartitionInfoHolder() {
- // empty constructor
- }
- }
-
- public static PartitionInfo getInstance() {
- return PartitionInfoHolder.INSTANCE;
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 880556015a..1a0561a551 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -48,10 +48,6 @@ public class ProcedureInfo {
CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new ConcurrentHashMap<>();
- public static ProcedureInfo getInstance() {
- return ProcedureInfoHolder.INSTANCE;
- }
-
public void load(List<Procedure> procedureList) {
try {
Files.list(Paths.get(procedureWalDir))
@@ -103,13 +99,4 @@ public class ProcedureInfo {
procWALMap.remove(procId);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
-
- private static class ProcedureInfoHolder {
-
- private static final ProcedureInfo INSTANCE = new ProcedureInfo();
-
- private ProcedureInfoHolder() {
- // Empty constructor
- }
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 2deed21569..66b3872e12 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -75,12 +75,17 @@ public class ConfigRequestExecutor {
private final ProcedureInfo procedureInfo;
- public ConfigRequestExecutor() {
- this.nodeInfo = NodeInfo.getInstance();
- this.clusterSchemaInfo = ClusterSchemaInfo.getInstance();
- this.partitionInfo = PartitionInfo.getInstance();
- this.authorInfo = AuthorInfo.getInstance();
- this.procedureInfo = ProcedureInfo.getInstance();
+ public ConfigRequestExecutor(
+ NodeInfo nodeInfo,
+ ClusterSchemaInfo clusterSchemaInfo,
+ PartitionInfo partitionInfo,
+ AuthorInfo authorInfo,
+ ProcedureInfo procedureInfo) {
+ this.nodeInfo = nodeInfo;
+ this.clusterSchemaInfo = clusterSchemaInfo;
+ this.partitionInfo = partitionInfo;
+ this.authorInfo = authorInfo;
+ this.procedureInfo = procedureInfo;
}
public DataSet executorQueryPlan(ConfigRequest req)
@@ -123,6 +128,7 @@ public class ConfigRequestExecutor {
case SetStorageGroup:
return clusterSchemaInfo.setStorageGroup((SetStorageGroupReq) req);
case DeleteStorageGroup:
+ partitionInfo.deleteStorageGroup((DeleteStorageGroupReq) req);
return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupReq) req);
case SetTTL:
return clusterSchemaInfo.setTTL((SetTTLReq) req);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ConfigProcedureStore.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ConfigProcedureStore.java
index cd061f9285..8a9eb977b7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ConfigProcedureStore.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ConfigProcedureStore.java
@@ -41,15 +41,14 @@ public class ConfigProcedureStore implements IProcedureStore {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureStore.class);
private volatile boolean isRunning = false;
- private ProcedureInfo procedureInfo = ProcedureInfo.getInstance();
+ private final ProcedureInfo procedureInfo;
private final String procedureWalDir =
CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
- private ConfigManager configManager;
+ private final ConfigManager configManager;
- public ConfigProcedureStore() {}
-
- public ConfigProcedureStore(ConfigManager configManager) {
+ public ConfigProcedureStore(ConfigManager configManager, ProcedureInfo procedureInfo) {
this.configManager = configManager;
+ this.procedureInfo = procedureInfo;
try {
checkProcWalDir(procedureWalDir);
} catch (IOException e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupProcedure.java
index 97923f493c..c174d9bccf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupProcedure.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
-import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -55,6 +55,8 @@ public class DeleteStorageGroupProcedure
private static boolean byPassForTest = false;
+ private ConfigManager configManager;
+
@TestOnly
public static void setByPassForTest(boolean byPass) {
byPassForTest = byPass;
@@ -66,9 +68,11 @@ public class DeleteStorageGroupProcedure
super();
}
- public DeleteStorageGroupProcedure(TStorageGroupSchema deleteSgSchema) {
+ public DeleteStorageGroupProcedure(
+ TStorageGroupSchema deleteSgSchema, ConfigManager configManager) {
super();
this.deleteSgSchema = deleteSgSchema;
+ this.configManager = configManager;
}
public TStorageGroupSchema getDeleteSgSchema() {
@@ -89,12 +93,14 @@ public class DeleteStorageGroupProcedure
List<TConsensusGroupId> dataRegionGroupIds = deleteSgSchema.getDataRegionGroupIds();
List<TConsensusGroupId> schemaRegionGroupIds = deleteSgSchema.getSchemaRegionGroupIds();
List<TRegionReplicaSet> dataRegionReplicaSets =
- new ArrayList<>(PartitionInfo.getInstance().getRegionReplicaSets(dataRegionGroupIds));
+ new ArrayList<>(
+ configManager.getPartitionManager().getRegionReplicaSets(dataRegionGroupIds));
List<TRegionReplicaSet> schemaRegionReplicaSets =
- new ArrayList<>(PartitionInfo.getInstance().getRegionReplicaSets(schemaRegionGroupIds));
+ new ArrayList<>(
+ configManager.getPartitionManager().getRegionReplicaSets(schemaRegionGroupIds));
try {
switch (state) {
- case DELETE_STORAGE_GROUP_PREPEARE:
+ case DELETE_STORAGE_GROUP_PREPARE:
// TODO: lock related ClusterSchemaInfo, PartitionInfo and Regions
setNextState(DeleteStorageGroupState.DELETE_DATA_REGION);
break;
@@ -210,7 +216,7 @@ public class DeleteStorageGroupProcedure
@Override
protected DeleteStorageGroupState getInitialState() {
- return DeleteStorageGroupState.DELETE_STORAGE_GROUP_PREPEARE;
+ return DeleteStorageGroupState.DELETE_STORAGE_GROUP_PREPARE;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupState.java
index eecc11578f..5e865e2fd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/DeleteStorageGroupState.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure;
public enum DeleteStorageGroupState {
- DELETE_STORAGE_GROUP_PREPEARE,
+ DELETE_STORAGE_GROUP_PREPARE,
DELETE_DATA_REGION,
DELETE_SCHEMA_REGION,
DELETE_CONFIG,
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index 4f668d407b..47675f349e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.hash;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.PartitionManager;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
import java.io.IOException;
import java.util.ArrayList;
@@ -61,7 +62,7 @@ public class DeviceGroupHashExecutorManualTest {
}
public void GeneralIndexTest() throws IOException {
- PartitionManager manager = new PartitionManager(new ConfigManager());
+ PartitionManager manager = new PartitionManager(new ConfigManager(), new PartitionInfo());
int[] bucket = new int[deviceGroupCount];
Arrays.fill(bucket, 0);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
index d1f80ff43a..b85987a280 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
@@ -59,7 +59,7 @@ public class AuthorInfoTest {
@BeforeClass
public static void setup() {
- authorInfo = AuthorInfo.getInstance();
+ authorInfo = new AuthorInfo();
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
index 629d216597..0735a83de0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
@@ -48,7 +48,7 @@ public class ClusterSchemaInfoTest {
@BeforeClass
public static void setup() {
- clusterSchemaInfo = ClusterSchemaInfo.getInstance();
+ clusterSchemaInfo = new ClusterSchemaInfo();
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 2caf683a1b..febddbe2ef 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -44,7 +44,7 @@ public class NodeInfoTest {
@BeforeClass
public static void setup() {
- nodeInfo = NodeInfo.getInstance();
+ nodeInfo = new NodeInfo();
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 6e120afd86..e6668cd778 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -70,7 +70,7 @@ public class PartitionInfoTest {
@BeforeClass
public static void setup() {
- partitionInfo = PartitionInfo.getInstance();
+ partitionInfo = new PartitionInfo();
if (!snapshotDir.exists()) {
snapshotDir.mkdirs();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 3853edc0ca..d4fb24adee 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -37,9 +37,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.NodeInfo;
-import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.procedure.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
@@ -103,10 +100,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@After
- public void after() throws IOException {
- NodeInfo.getInstance().clear();
- ClusterSchemaInfo.getInstance().clear();
- PartitionInfo.getInstance().clear();
+ public void after() throws IOException, InterruptedException {
processor.close();
FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
FileUtils.deleteFully(