You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/24 02:15:53 UTC
[iotdb] branch master updated: [IOTDB-2945] Reconstruct ConfigNode manage layer and persistence layer (#5627)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 467add666d [IOTDB-2945] Reconstruct ConfigNode manage layer and persistence layer (#5627)
467add666d is described below
commit 467add666d41791492169447378a1d01c5780e1e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Apr 24 10:15:47 2022 +0800
[IOTDB-2945] Reconstruct ConfigNode manage layer and persistence layer (#5627)
---
.../resources/conf/iotdb-confignode.properties | 4 +-
.../iotdb/confignode/cli/TemporaryClient.java | 38 +---
.../request/ConfigRequest.java} | 64 +++---
.../request/ConfigRequestType.java} | 4 +-
.../request/auth/AuthorReq.java} | 62 +++---
.../request/read/GetOrCreateDataPartitionReq.java} | 14 +-
.../read/GetOrCreateSchemaPartitionReq.java} | 14 +-
.../request/read/QueryDataNodeInfoReq.java} | 18 +-
.../request/read/QueryStorageGroupSchemaReq.java} | 12 +-
.../request/write/CreateDataPartitionReq.java} | 16 +-
.../request/write/CreateRegionsReq.java} | 16 +-
.../request/write/CreateSchemaPartitionReq.java} | 16 +-
.../request/write/DeleteStorageGroupReq.java} | 12 +-
.../request/write/RegisterDataNodeReq.java} | 18 +-
.../request/write/SetStorageGroupReq.java} | 18 +-
...DataSet.java => DataNodeConfigurationResp.java} | 4 +-
...ionsDataSet.java => DataNodeLocationsResp.java} | 4 +-
...artitionDataSet.java => DataPartitionResp.java} | 4 +-
...ionInfoDataSet.java => PermissionInfoResp.java} | 6 +-
...titionDataSet.java => SchemaPartitionResp.java} | 4 +-
...emaDataSet.java => StorageGroupSchemaResp.java} | 4 +-
.../statemachine/PartitionRegionStateMachine.java | 22 +-
.../physical/UnknownPhysicalPlanTypeException.java | 4 +-
...egionManager.java => ClusterSchemaManager.java} | 105 ++++++----
.../iotdb/confignode/manager/ConfigManager.java | 109 +++++-----
.../iotdb/confignode/manager/ConsensusManager.java | 6 +-
.../iotdb/confignode/manager/DataNodeManager.java | 32 ++-
.../apache/iotdb/confignode/manager/Manager.java | 36 ++--
.../iotdb/confignode/manager/PartitionManager.java | 61 +++---
.../confignode/manager/PermissionManager.java | 12 +-
...{AuthorInfoPersistence.java => AuthorInfo.java} | 60 +++---
...aNodeInfoPersistence.java => DataNodeInfo.java} | 45 ++---
...tionInfoPersistence.java => PartitionInfo.java} | 137 ++++++++++---
.../persistence/RegionInfoPersistence.java | 221 ---------------------
.../confignode/persistence/StorageGroupInfo.java | 150 ++++++++++++++
.../iotdb/confignode/service/ConfigNode.java | 25 ++-
.../confignode/service/executor/PlanExecutor.java | 80 ++++----
...odeRPCServer.java => ConfigNodeRPCService.java} | 34 +---
.../{server => }/ConfigNodeRPCServiceHandler.java | 6 +-
...erMBean.java => ConfigNodeRPCServiceMBean.java} | 4 +-
...sor.java => ConfigNodeRPCServiceProcessor.java} | 114 +++++------
...nSerDeTest.java => ConfigRequestSerDeTest.java} | 158 +++++++--------
...java => ConfigNodeRPCServiceProcessorTest.java} | 21 +-
.../apache/iotdb/commons/service/JMXService.java | 13 +-
.../iotdb/procedure/service/ProcedureNode.java | 2 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
46 files changed, 911 insertions(+), 900 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index dccb57f8ba..2b8b4272ff 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -62,7 +62,7 @@ config_node_internal_port=22278
# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
# Datatype: String
-config_node_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAloneConsensus
+# config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
# Used for building the ConfigNode consensus group
# all config node address and internal port, use comma to distinguish
@@ -75,7 +75,7 @@ config_node_consensus_protocol_class=org.apache.iotdb.consensus.standalone.Stand
# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
# Datatype: String
-data_node_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAloneConsensus
+# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
####################
### PartitionSlot Configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
index de570eb36f..009c47c004 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.cli;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.confignode.persistence.DataNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -96,9 +96,7 @@ public class TemporaryClient {
if (clients.get(dataNodeId) == null) {
buildClient(
dataNodeId,
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}
TCreateSchemaRegionReq req = genCreateSchemaRegionReq(storageGroup, regionReplicaSet);
@@ -109,25 +107,19 @@ public class TemporaryClient {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Create SchemaRegion on DataNode: {} success",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
return;
} else {
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint(),
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint(),
status);
}
} catch (TException e) {
// TODO: Handler SocketTimeOutException
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint(),
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint(),
e.toString());
try {
TimeUnit.MILLISECONDS.sleep(retryWait);
@@ -139,7 +131,7 @@ public class TemporaryClient {
}
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed.",
- DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}
private TCreateDataRegionReq genCreateDataRegionReq(
@@ -157,9 +149,7 @@ public class TemporaryClient {
if (clients.get(dataNodeId) == null) {
buildClient(
dataNodeId,
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}
TCreateDataRegionReq req = genCreateDataRegionReq(storageGroup, regionReplicaSet, TTL);
@@ -170,25 +160,19 @@ public class TemporaryClient {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Create DataRegion on DataNode: {} success",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
return;
} else {
LOGGER.error(
"Create DataRegion on DataNode: {} failed, {}. Retrying...",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint(),
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint(),
status);
}
} catch (TException e) {
// TODO: Handler SocketTimeOutException
LOGGER.error(
"Create DataRegion on DataNode: {} failed, {}. Retrying...",
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNode(dataNodeId)
- .getInternalEndPoint(),
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint(),
e.toString());
try {
TimeUnit.MILLISECONDS.sleep(retryWait);
@@ -200,7 +184,7 @@ public class TemporaryClient {
}
LOGGER.error(
"Create DataRegion on DataNode: {} failed.",
- DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
+ DataNodeInfo.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}
private static class TemporaryClientHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
similarity index 62%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 7f1c18f9fb..5ddce7f919 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical;
+package org.apache.iotdb.confignode.consensus.request;
-import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryStorageGroupSchemaReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.slf4j.Logger;
@@ -37,17 +37,17 @@ import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
-public abstract class PhysicalPlan implements IConsensusRequest {
+public abstract class ConfigRequest implements IConsensusRequest {
- private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalPlan.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRequest.class);
- private final PhysicalPlanType type;
+ private final ConfigRequestType type;
- public PhysicalPlan(PhysicalPlanType type) {
+ public ConfigRequest(ConfigRequestType type) {
this.type = type;
}
- public PhysicalPlanType getType() {
+ public ConfigRequestType getType() {
return this.type;
}
@@ -80,46 +80,46 @@ public abstract class PhysicalPlan implements IConsensusRequest {
public static class Factory {
- public static PhysicalPlan create(ByteBuffer buffer) throws IOException {
+ public static ConfigRequest create(ByteBuffer buffer) throws IOException {
int typeNum = buffer.getInt();
- if (typeNum >= PhysicalPlanType.values().length) {
+ if (typeNum >= ConfigRequestType.values().length) {
throw new IOException("unrecognized log type " + typeNum);
}
- PhysicalPlanType type = PhysicalPlanType.values()[typeNum];
- PhysicalPlan plan;
+ ConfigRequestType type = ConfigRequestType.values()[typeNum];
+ ConfigRequest plan;
switch (type) {
case RegisterDataNode:
- plan = new RegisterDataNodePlan();
+ plan = new RegisterDataNodeReq();
break;
case QueryDataNodeInfo:
- plan = new QueryDataNodeInfoPlan();
+ plan = new QueryDataNodeInfoReq();
break;
case SetStorageGroup:
- plan = new SetStorageGroupPlan();
+ plan = new SetStorageGroupReq();
break;
case QueryStorageGroupSchema:
- plan = new QueryStorageGroupSchemaPlan();
+ plan = new QueryStorageGroupSchemaReq();
break;
case CreateRegions:
- plan = new CreateRegionsPlan();
+ plan = new CreateRegionsReq();
break;
case GetSchemaPartition:
- plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
+ plan = new GetOrCreateSchemaPartitionReq(ConfigRequestType.GetSchemaPartition);
break;
case CreateSchemaPartition:
- plan = new CreateSchemaPartitionPlan();
+ plan = new CreateSchemaPartitionReq();
break;
case GetOrCreateSchemaPartition:
- plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ plan = new GetOrCreateSchemaPartitionReq(ConfigRequestType.GetOrCreateSchemaPartition);
break;
case GetDataPartition:
- plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+ plan = new GetOrCreateDataPartitionReq(ConfigRequestType.GetDataPartition);
break;
case CreateDataPartition:
- plan = new CreateDataPartitionPlan();
+ plan = new CreateDataPartitionReq();
break;
case GetOrCreateDataPartition:
- plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
+ plan = new GetOrCreateDataPartitionReq(ConfigRequestType.GetOrCreateDataPartition);
break;
case LIST_USER:
case LIST_ROLE:
@@ -138,7 +138,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
case REVOKE_ROLE:
case REVOKE_ROLE_FROM_USER:
case UPDATE_USER:
- plan = new AuthorPlan(type);
+ plan = new AuthorReq(type);
break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index f047dd9723..28ca9f4b63 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical;
+package org.apache.iotdb.confignode.consensus.request;
-public enum PhysicalPlanType {
+public enum ConfigRequestType {
RegisterDataNode,
QueryDataNodeInfo,
SetStorageGroup,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/AuthorPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/auth/AuthorReq.java
similarity index 77%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/AuthorPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/auth/AuthorReq.java
index bc90b6edfb..520f2348b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/AuthorPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/auth/AuthorReq.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.auth;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.db.auth.AuthException;
import java.nio.ByteBuffer;
@@ -28,9 +28,9 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
-public class AuthorPlan extends PhysicalPlan {
+public class AuthorReq extends ConfigRequest {
- private PhysicalPlanType authorType;
+ private ConfigRequestType authorType;
private String roleName;
private String password;
private String newPassword;
@@ -38,7 +38,7 @@ public class AuthorPlan extends PhysicalPlan {
private String nodeName;
private String userName;
- public AuthorPlan(PhysicalPlanType type) {
+ public AuthorReq(ConfigRequestType type) {
super(type);
authorType = type;
}
@@ -55,8 +55,8 @@ public class AuthorPlan extends PhysicalPlan {
* @param nodeName node name in Path structure
* @throws AuthException Authentication Exception
*/
- public AuthorPlan(
- PhysicalPlanType authorType,
+ public AuthorReq(
+ ConfigRequestType authorType,
String userName,
String roleName,
String password,
@@ -74,11 +74,11 @@ public class AuthorPlan extends PhysicalPlan {
this.nodeName = nodeName;
}
- public PhysicalPlanType getAuthorType() {
+ public ConfigRequestType getAuthorType() {
return authorType;
}
- public void setAuthorType(PhysicalPlanType authorType) {
+ public void setAuthorType(ConfigRequestType authorType) {
this.authorType = authorType;
}
@@ -168,62 +168,62 @@ public class AuthorPlan extends PhysicalPlan {
nodeName = BasicStructureSerDeUtil.readString(buffer);
}
- private int getPlanTypeOrdinal(PhysicalPlanType physicalPlanType) {
+ private int getPlanTypeOrdinal(ConfigRequestType configRequestType) {
int type;
- switch (physicalPlanType) {
+ switch (configRequestType) {
case CREATE_USER:
- type = PhysicalPlanType.CREATE_USER.ordinal();
+ type = ConfigRequestType.CREATE_USER.ordinal();
break;
case CREATE_ROLE:
- type = PhysicalPlanType.CREATE_ROLE.ordinal();
+ type = ConfigRequestType.CREATE_ROLE.ordinal();
break;
case DROP_USER:
- type = PhysicalPlanType.DROP_USER.ordinal();
+ type = ConfigRequestType.DROP_USER.ordinal();
break;
case DROP_ROLE:
- type = PhysicalPlanType.DROP_ROLE.ordinal();
+ type = ConfigRequestType.DROP_ROLE.ordinal();
break;
case GRANT_ROLE:
- type = PhysicalPlanType.GRANT_ROLE.ordinal();
+ type = ConfigRequestType.GRANT_ROLE.ordinal();
break;
case GRANT_USER:
- type = PhysicalPlanType.GRANT_USER.ordinal();
+ type = ConfigRequestType.GRANT_USER.ordinal();
break;
case GRANT_ROLE_TO_USER:
- type = PhysicalPlanType.GRANT_ROLE_TO_USER.ordinal();
+ type = ConfigRequestType.GRANT_ROLE_TO_USER.ordinal();
break;
case REVOKE_USER:
- type = PhysicalPlanType.REVOKE_USER.ordinal();
+ type = ConfigRequestType.REVOKE_USER.ordinal();
break;
case REVOKE_ROLE:
- type = PhysicalPlanType.REVOKE_ROLE.ordinal();
+ type = ConfigRequestType.REVOKE_ROLE.ordinal();
break;
case REVOKE_ROLE_FROM_USER:
- type = PhysicalPlanType.REVOKE_ROLE_FROM_USER.ordinal();
+ type = ConfigRequestType.REVOKE_ROLE_FROM_USER.ordinal();
break;
case UPDATE_USER:
- type = PhysicalPlanType.UPDATE_USER.ordinal();
+ type = ConfigRequestType.UPDATE_USER.ordinal();
break;
case LIST_USER:
- type = PhysicalPlanType.LIST_USER.ordinal();
+ type = ConfigRequestType.LIST_USER.ordinal();
break;
case LIST_ROLE:
- type = PhysicalPlanType.LIST_ROLE.ordinal();
+ type = ConfigRequestType.LIST_ROLE.ordinal();
break;
case LIST_USER_PRIVILEGE:
- type = PhysicalPlanType.LIST_USER_PRIVILEGE.ordinal();
+ type = ConfigRequestType.LIST_USER_PRIVILEGE.ordinal();
break;
case LIST_ROLE_PRIVILEGE:
- type = PhysicalPlanType.LIST_ROLE_PRIVILEGE.ordinal();
+ type = ConfigRequestType.LIST_ROLE_PRIVILEGE.ordinal();
break;
case LIST_USER_ROLES:
- type = PhysicalPlanType.LIST_USER_ROLES.ordinal();
+ type = ConfigRequestType.LIST_USER_ROLES.ordinal();
break;
case LIST_ROLE_USERS:
- type = PhysicalPlanType.LIST_ROLE_USERS.ordinal();
+ type = ConfigRequestType.LIST_ROLE_USERS.ordinal();
break;
default:
- throw new IllegalArgumentException("Unknown operator: " + physicalPlanType);
+ throw new IllegalArgumentException("Unknown operator: " + configRequestType);
}
return type;
}
@@ -236,7 +236,7 @@ public class AuthorPlan extends PhysicalPlan {
if (o == null || getClass() != o.getClass()) {
return false;
}
- AuthorPlan that = (AuthorPlan) o;
+ AuthorReq that = (AuthorReq) o;
return Objects.equals(authorType, that.authorType)
&& Objects.equals(userName, that.userName)
&& Objects.equals(roleName, that.roleName)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateDataPartitionReq.java
similarity index 92%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateDataPartitionReq.java
index 8547b7118f..a47be168d0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateDataPartitionReq.java
@@ -16,15 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.crud;
+package org.apache.iotdb.confignode.consensus.request.read;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import java.nio.ByteBuffer;
@@ -35,12 +35,12 @@ import java.util.Map;
import java.util.Objects;
/** Get or create DataPartition by the specific partitionSlotsMap. */
-public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
+public class GetOrCreateDataPartitionReq extends ConfigRequest {
private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap;
- public GetOrCreateDataPartitionPlan(PhysicalPlanType physicalPlanType) {
- super(physicalPlanType);
+ public GetOrCreateDataPartitionReq(ConfigRequestType configRequestType) {
+ super(configRequestType);
}
public Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> getPartitionSlotsMap() {
@@ -130,7 +130,7 @@ public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- GetOrCreateDataPartitionPlan that = (GetOrCreateDataPartitionPlan) o;
+ GetOrCreateDataPartitionReq that = (GetOrCreateDataPartitionReq) o;
return partitionSlotsMap.equals(that.partitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateSchemaPartitionReq.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateSchemaPartitionReq.java
index 2f3df69725..ad8fe6b6c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetOrCreateSchemaPartitionReq.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.crud;
+package org.apache.iotdb.confignode.consensus.request.read;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,15 +33,15 @@ import java.util.Map;
import java.util.Objects;
/** Get or create SchemaPartition by the specific partitionSlotsMap. */
-public class GetOrCreateSchemaPartitionPlan extends PhysicalPlan {
+public class GetOrCreateSchemaPartitionReq extends ConfigRequest {
// Map<StorageGroup, List<SeriesPartitionSlot>>
// Get all SchemaPartitions when the partitionSlotsMap is empty
// Get all exists SchemaPartitions in one StorageGroup when the SeriesPartitionSlot is empty
private Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap;
- public GetOrCreateSchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
- super(physicalPlanType);
+ public GetOrCreateSchemaPartitionReq(ConfigRequestType configRequestType) {
+ super(configRequestType);
}
public void setPartitionSlotsMap(Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
@@ -87,7 +87,7 @@ public class GetOrCreateSchemaPartitionPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- GetOrCreateSchemaPartitionPlan that = (GetOrCreateSchemaPartitionPlan) o;
+ GetOrCreateSchemaPartitionReq that = (GetOrCreateSchemaPartitionReq) o;
return partitionSlotsMap.equals(that.partitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryDataNodeInfoReq.java
similarity index 75%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryDataNodeInfoReq.java
index 503fae05ca..8310d83a9c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryDataNodeInfoReq.java
@@ -16,24 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.read;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.nio.ByteBuffer;
import java.util.Objects;
/** Get DataNodeInfo by the specific DataNode's id. And return all when dataNodeID is set to -1. */
-public class QueryDataNodeInfoPlan extends PhysicalPlan {
+public class QueryDataNodeInfoReq extends ConfigRequest {
private int dataNodeID;
- public QueryDataNodeInfoPlan() {
- super(PhysicalPlanType.QueryDataNodeInfo);
+ public QueryDataNodeInfoReq() {
+ super(ConfigRequestType.QueryDataNodeInfo);
}
- public QueryDataNodeInfoPlan(int dataNodeID) {
+ public QueryDataNodeInfoReq(int dataNodeID) {
this();
this.dataNodeID = dataNodeID;
}
@@ -44,7 +44,7 @@ public class QueryDataNodeInfoPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.QueryDataNodeInfo.ordinal());
+ buffer.putInt(ConfigRequestType.QueryDataNodeInfo.ordinal());
buffer.putInt(dataNodeID);
}
@@ -57,7 +57,7 @@ public class QueryDataNodeInfoPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- QueryDataNodeInfoPlan that = (QueryDataNodeInfoPlan) o;
+ QueryDataNodeInfoReq that = (QueryDataNodeInfoReq) o;
return dataNodeID == that.dataNodeID;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryStorageGroupSchemaPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryStorageGroupSchemaReq.java
similarity index 74%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryStorageGroupSchemaPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryStorageGroupSchemaReq.java
index c6eaa6a624..8912a8c890 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryStorageGroupSchemaPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/QueryStorageGroupSchemaReq.java
@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.read;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.nio.ByteBuffer;
-public class QueryStorageGroupSchemaPlan extends PhysicalPlan {
+public class QueryStorageGroupSchemaReq extends ConfigRequest {
- public QueryStorageGroupSchemaPlan() {
- super(PhysicalPlanType.QueryStorageGroupSchema);
+ public QueryStorageGroupSchemaReq() {
+ super(ConfigRequestType.QueryStorageGroupSchema);
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateDataPartitionReq.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateDataPartitionReq.java
index 0b1bc95bba..bd87c0683b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateDataPartitionReq.java
@@ -16,15 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.crud;
+package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,13 +35,13 @@ import java.util.Map;
import java.util.Objects;
/** Create DataPartition by assignedDataPartition */
-public class CreateDataPartitionPlan extends PhysicalPlan {
+public class CreateDataPartitionReq extends ConfigRequest {
private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
assignedDataPartition;
- public CreateDataPartitionPlan() {
- super(PhysicalPlanType.CreateDataPartition);
+ public CreateDataPartitionReq() {
+ super(ConfigRequestType.CreateDataPartition);
}
public Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
@@ -57,7 +57,7 @@ public class CreateDataPartitionPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.CreateDataPartition.ordinal());
+ buffer.putInt(ConfigRequestType.CreateDataPartition.ordinal());
buffer.putInt(assignedDataPartition.size());
for (Map.Entry<
@@ -118,7 +118,7 @@ public class CreateDataPartitionPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- CreateDataPartitionPlan that = (CreateDataPartitionPlan) o;
+ CreateDataPartitionReq that = (CreateDataPartitionReq) o;
return assignedDataPartition.equals(that.assignedDataPartition);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
similarity index 85%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
index e5ceca415b..7eb7bb672f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.crud;
+package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -31,14 +31,14 @@ import java.util.List;
import java.util.Objects;
/** Create regions for specific StorageGroup */
-public class CreateRegionsPlan extends PhysicalPlan {
+public class CreateRegionsReq extends ConfigRequest {
private String storageGroup;
private final List<TRegionReplicaSet> regionReplicaSets;
- public CreateRegionsPlan() {
- super(PhysicalPlanType.CreateRegions);
+ public CreateRegionsReq() {
+ super(ConfigRequestType.CreateRegions);
this.regionReplicaSets = new ArrayList<>();
}
@@ -60,7 +60,7 @@ public class CreateRegionsPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.CreateRegions.ordinal());
+ buffer.putInt(ConfigRequestType.CreateRegions.ordinal());
BasicStructureSerDeUtil.write(storageGroup, buffer);
@@ -84,7 +84,7 @@ public class CreateRegionsPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- CreateRegionsPlan that = (CreateRegionsPlan) o;
+ CreateRegionsReq that = (CreateRegionsReq) o;
return storageGroup.equals(that.storageGroup)
&& regionReplicaSets.equals(that.regionReplicaSets);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateSchemaPartitionReq.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateSchemaPartitionReq.java
index 417227b920..ea6e91ea78 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateSchemaPartitionReq.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.crud;
+package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -32,12 +32,12 @@ import java.util.Map;
import java.util.Objects;
/** Create SchemaPartition by assignedSchemaPartition */
-public class CreateSchemaPartitionPlan extends PhysicalPlan {
+public class CreateSchemaPartitionReq extends ConfigRequest {
private Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> assignedSchemaPartition;
- public CreateSchemaPartitionPlan() {
- super(PhysicalPlanType.CreateSchemaPartition);
+ public CreateSchemaPartitionReq() {
+ super(ConfigRequestType.CreateSchemaPartition);
}
public Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> getAssignedSchemaPartition() {
@@ -51,7 +51,7 @@ public class CreateSchemaPartitionPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.CreateSchemaPartition.ordinal());
+ buffer.putInt(ConfigRequestType.CreateSchemaPartition.ordinal());
buffer.putInt(assignedSchemaPartition.size());
assignedSchemaPartition.forEach(
@@ -89,7 +89,7 @@ public class CreateSchemaPartitionPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- CreateSchemaPartitionPlan that = (CreateSchemaPartitionPlan) o;
+ CreateSchemaPartitionReq that = (CreateSchemaPartitionReq) o;
return assignedSchemaPartition.equals(that.assignedSchemaPartition);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DeleteStorageGroupPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteStorageGroupReq.java
similarity index 74%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DeleteStorageGroupPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteStorageGroupReq.java
index 46c672bac5..04d7e5d5e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DeleteStorageGroupPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DeleteStorageGroupReq.java
@@ -16,19 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.write;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.nio.ByteBuffer;
-public class DeleteStorageGroupPlan extends PhysicalPlan {
+public class DeleteStorageGroupReq extends ConfigRequest {
// TODO: @YongzaoDan
- public DeleteStorageGroupPlan() {
- super(PhysicalPlanType.DeleteStorageGroup);
+ public DeleteStorageGroupReq() {
+ super(ConfigRequestType.DeleteStorageGroup);
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
similarity index 76%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
index 01a8199ee6..1c51723278 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
@@ -16,25 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class RegisterDataNodePlan extends PhysicalPlan {
+public class RegisterDataNodeReq extends ConfigRequest {
private TDataNodeLocation location;
- public RegisterDataNodePlan() {
- super(PhysicalPlanType.RegisterDataNode);
+ public RegisterDataNodeReq() {
+ super(ConfigRequestType.RegisterDataNode);
}
- public RegisterDataNodePlan(TDataNodeLocation location) {
+ public RegisterDataNodeReq(TDataNodeLocation location) {
this();
this.location = location;
}
@@ -45,7 +45,7 @@ public class RegisterDataNodePlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.RegisterDataNode.ordinal());
+ buffer.putInt(ConfigRequestType.RegisterDataNode.ordinal());
ThriftCommonsSerDeUtils.writeTDataNodeLocation(location, buffer);
}
@@ -58,7 +58,7 @@ public class RegisterDataNodePlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- RegisterDataNodePlan plan = (RegisterDataNodePlan) o;
+ RegisterDataNodeReq plan = (RegisterDataNodeReq) o;
return location.equals(plan.location);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SetStorageGroupReq.java
similarity index 78%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SetStorageGroupReq.java
index 80078672e2..095a641c88 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SetStorageGroupReq.java
@@ -16,27 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.consensus.request.write;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class SetStorageGroupPlan extends PhysicalPlan {
+public class SetStorageGroupReq extends ConfigRequest {
private TStorageGroupSchema schema;
- public SetStorageGroupPlan() {
- super(PhysicalPlanType.SetStorageGroup);
+ public SetStorageGroupReq() {
+ super(ConfigRequestType.SetStorageGroup);
this.schema = new TStorageGroupSchema();
}
- public SetStorageGroupPlan(TStorageGroupSchema schema) {
+ public SetStorageGroupReq(TStorageGroupSchema schema) {
this();
this.schema = schema;
}
@@ -51,7 +51,7 @@ public class SetStorageGroupPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.SetStorageGroup.ordinal());
+ buffer.putInt(ConfigRequestType.SetStorageGroup.ordinal());
ThriftConfigNodeSerDeUtils.writeTStorageGroupSchema(schema, buffer);
}
@@ -64,7 +64,7 @@ public class SetStorageGroupPlan extends PhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- SetStorageGroupPlan that = (SetStorageGroupPlan) o;
+ SetStorageGroupReq that = (SetStorageGroupReq) o;
return schema.equals(that.schema);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
index 178210be01..d4301d0f42 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
@@ -24,13 +24,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
-public class DataNodeConfigurationDataSet implements DataSet {
+public class DataNodeConfigurationResp implements DataSet {
private TSStatus status;
private Integer dataNodeId;
private TGlobalConfig globalConfig;
- public DataNodeConfigurationDataSet() {
+ public DataNodeConfigurationResp() {
this.dataNodeId = null;
this.globalConfig = null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java
index db23bbc4b9..0331e40371 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java
@@ -26,12 +26,12 @@ import org.apache.iotdb.rpc.TSStatusCode;
import java.util.Map;
-public class DataNodeLocationsDataSet implements DataSet {
+public class DataNodeLocationsResp implements DataSet {
private TSStatus status;
private Map<Integer, TDataNodeLocation> dataNodeLocationMap;
- public DataNodeLocationsDataSet() {
+ public DataNodeLocationsResp() {
// empty constructor
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionResp.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionResp.java
index 15da627333..7b1e01407b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionResp.java
@@ -25,13 +25,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
-public class DataPartitionDataSet implements DataSet {
+public class DataPartitionResp implements DataSet {
private TSStatus status;
private DataPartition dataPartition;
- public DataPartitionDataSet() {
+ public DataPartitionResp() {
// Empty constructor
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoResp.java
similarity index 89%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoResp.java
index 093f959c5f..50fde2a033 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PermissionInfoResp.java
@@ -25,15 +25,15 @@ import org.apache.iotdb.consensus.common.DataSet;
import java.util.List;
import java.util.Map;
-public class PermissionInfoDataSet implements DataSet {
+public class PermissionInfoResp implements DataSet {
private TSStatus status;
private Map<String, List<String>> permissionInfo;
- public PermissionInfoDataSet() {}
+ public PermissionInfoResp() {}
- public PermissionInfoDataSet(TSStatus status, Map<String, List<String>> permissionInfo) {
+ public PermissionInfoResp(TSStatus status, Map<String, List<String>> permissionInfo) {
this.status = status;
this.permissionInfo = permissionInfo;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
index 5ed93b7c05..2b99ea664f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionResp.java
@@ -25,13 +25,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
-public class SchemaPartitionDataSet implements DataSet {
+public class SchemaPartitionResp implements DataSet {
private TSStatus status;
private SchemaPartition schemaPartition;
- public SchemaPartitionDataSet() {
+ public SchemaPartitionResp() {
// empty constructor
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
index 0874de13e3..568ad2ff18 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
@@ -28,13 +28,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class StorageGroupSchemaDataSet implements DataSet {
+public class StorageGroupSchemaResp implements DataSet {
private TSStatus status;
private List<TStorageGroupSchema> schemaList;
- public StorageGroupSchemaDataSet() {}
+ public StorageGroupSchemaResp() {}
public TSStatus getStatus() {
return status;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 38bd8b1305..73aa4410d1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.service.executor.PlanExecutor;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
@@ -50,16 +50,16 @@ public class PartitionRegionStateMachine implements IStateMachine {
@Override
public TSStatus write(IConsensusRequest request) {
- PhysicalPlan plan;
+ ConfigRequest plan;
if (request instanceof ByteBufferConsensusRequest) {
try {
- plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+ plan = ConfigRequest.Factory.create(((ByteBufferConsensusRequest) request).getContent());
} catch (IOException e) {
LOGGER.error("Deserialization error for write plan : {}", request);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
- } else if (request instanceof PhysicalPlan) {
- plan = (PhysicalPlan) request;
+ } else if (request instanceof ConfigRequest) {
+ plan = (ConfigRequest) request;
} else {
LOGGER.error("Unexpected write plan : {}", request);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -68,7 +68,7 @@ public class PartitionRegionStateMachine implements IStateMachine {
}
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
- protected TSStatus write(PhysicalPlan plan) {
+ protected TSStatus write(ConfigRequest plan) {
TSStatus result;
try {
result = executor.executorNonQueryPlan(plan);
@@ -81,16 +81,16 @@ public class PartitionRegionStateMachine implements IStateMachine {
@Override
public DataSet read(IConsensusRequest request) {
- PhysicalPlan plan;
+ ConfigRequest plan;
if (request instanceof ByteBufferConsensusRequest) {
try {
- plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+ plan = ConfigRequest.Factory.create(((ByteBufferConsensusRequest) request).getContent());
} catch (IOException e) {
LOGGER.error("Deserialization error for write plan : {}", request);
return null;
}
- } else if (request instanceof PhysicalPlan) {
- plan = (PhysicalPlan) request;
+ } else if (request instanceof ConfigRequest) {
+ plan = (ConfigRequest) request;
} else {
LOGGER.error("Unexpected read plan : {}", request);
return null;
@@ -113,7 +113,7 @@ public class PartitionRegionStateMachine implements IStateMachine {
public void cleanUpOldSnapshots(File snapshotDir) {}
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
- protected DataSet read(PhysicalPlan plan) {
+ protected DataSet read(ConfigRequest plan) {
DataSet result;
try {
result = executor.executorQueryPlan(plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/exception/physical/UnknownPhysicalPlanTypeException.java b/confignode/src/main/java/org/apache/iotdb/confignode/exception/physical/UnknownPhysicalPlanTypeException.java
index e2149a0ce5..e29bf3adad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/exception/physical/UnknownPhysicalPlanTypeException.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/exception/physical/UnknownPhysicalPlanTypeException.java
@@ -18,12 +18,12 @@
*/
package org.apache.iotdb.confignode.exception.physical;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.exception.ConfigNodeException;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
public class UnknownPhysicalPlanTypeException extends ConfigNodeException {
- public UnknownPhysicalPlanTypeException(PhysicalPlanType type) {
+ public UnknownPhysicalPlanTypeException(ConfigRequestType type) {
super(String.format("Unknown PhysicalPlanType: %d", type.ordinal()));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
similarity index 63%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index dfbacf4b2e..ec3c0f7871 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -27,19 +26,19 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.cli.TemporaryClient;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.read.QueryStorageGroupSchemaReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.StorageGroupInfo;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.Collections;
import java.util.List;
-/** manage data partition and schema partition */
-public class RegionManager {
+public class ClusterSchemaManager {
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
private static final int schemaReplicationFactor = conf.getSchemaReplicationFactor();
@@ -47,51 +46,48 @@ public class RegionManager {
private static final int initialSchemaRegionCount = conf.getInitialSchemaRegionCount();
private static final int initialDataRegionCount = conf.getInitialDataRegionCount();
- private static final RegionInfoPersistence regionInfoPersistence =
- RegionInfoPersistence.getInstance();
-
- private final Manager configNodeManager;
+ private static final StorageGroupInfo storageGroupInfo = StorageGroupInfo.getInstance();
+ private static final PartitionInfo partitionInfo = PartitionInfo.getInstance();
- public RegionManager(Manager configNodeManager) {
- this.configNodeManager = configNodeManager;
- }
+ private final Manager configManager;
- private ConsensusManager getConsensusManager() {
- return configNodeManager.getConsensusManager();
+ public ClusterSchemaManager(Manager configManager) {
+ this.configManager = configManager;
}
/**
* Set StorageGroup and allocate the default amount Regions
*
- * @param plan SetStorageGroupPlan
+ * @param setPlan SetStorageGroupPlan
* @return SUCCESS_STATUS if the StorageGroup is set and region allocation successful.
* NOT_ENOUGH_DATA_NODE if there are not enough DataNode for Region allocation.
* STORAGE_GROUP_ALREADY_EXISTS if the StorageGroup is already set.
*/
- public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+ public TSStatus setStorageGroup(SetStorageGroupReq setPlan) {
TSStatus result;
- if (configNodeManager.getDataNodeManager().getOnlineDataNodeCount()
+ if (configManager.getDataNodeManager().getOnlineDataNodeCount()
< Math.max(initialSchemaRegionCount, initialDataRegionCount)) {
result = new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
result.setMessage("DataNode is not enough, please register more.");
} else {
- if (regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
+ if (storageGroupInfo.containsStorageGroup(setPlan.getSchema().getName())) {
result = new TSStatus(TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode());
result.setMessage(
- String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
+ String.format("StorageGroup %s is already set.", setPlan.getSchema().getName()));
} else {
- CreateRegionsPlan createPlan = new CreateRegionsPlan();
- createPlan.setStorageGroup(plan.getSchema().getName());
+ CreateRegionsReq createPlan = new CreateRegionsReq();
+ createPlan.setStorageGroup(setPlan.getSchema().getName());
// Allocate default Regions
- allocateRegions(TConsensusGroupType.SchemaRegion, createPlan);
- allocateRegions(TConsensusGroupType.DataRegion, createPlan);
+ allocateRegions(TConsensusGroupType.SchemaRegion, createPlan, setPlan);
+ allocateRegions(TConsensusGroupType.DataRegion, createPlan, setPlan);
// Persist StorageGroup and Regions
- getConsensusManager().write(plan);
+ getConsensusManager().write(setPlan);
result = getConsensusManager().write(createPlan).getStatus();
// Create Regions in DataNode
+ // TODO: use client pool
for (TRegionReplicaSet regionReplicaSet : createPlan.getRegionReplicaSets()) {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
switch (regionReplicaSet.getRegionId().getType()) {
@@ -108,7 +104,7 @@ public class RegionManager {
dataNodeLocation.getDataNodeId(),
createPlan.getStorageGroup(),
regionReplicaSet,
- plan.getSchema().getTTL());
+ setPlan.getSchema().getTTL());
}
}
}
@@ -117,11 +113,9 @@ public class RegionManager {
return result;
}
- private DataNodeManager getDataNodeInfoManager() {
- return configNodeManager.getDataNodeManager();
- }
-
- private void allocateRegions(TConsensusGroupType type, CreateRegionsPlan plan) {
+ /** TODO: Allocate by LoadManager */
+ private void allocateRegions(
+ TConsensusGroupType type, CreateRegionsReq createRegionsReq, SetStorageGroupReq setSGReq) {
// TODO: Use CopySet algorithm to optimize region allocation policy
@@ -139,20 +133,53 @@ public class RegionManager {
TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
TConsensusGroupId consensusGroupId =
- new TConsensusGroupId(type, regionInfoPersistence.generateNextRegionGroupId());
+ new TConsensusGroupId(type, partitionInfo.generateNextRegionGroupId());
regionReplicaSet.setRegionId(consensusGroupId);
regionReplicaSet.setDataNodeLocations(onlineDataNodes.subList(0, replicaCount));
- plan.addRegion(regionReplicaSet);
+ createRegionsReq.addRegion(regionReplicaSet);
+
+ switch (type) {
+ case SchemaRegion:
+ setSGReq.getSchema().addToSchemaRegionGroupIds(consensusGroupId);
+ break;
+ case DataRegion:
+ setSGReq.getSchema().addToDataRegionGroupIds(consensusGroupId);
+ }
}
}
- public StorageGroupSchemaDataSet getStorageGroupSchema() {
+ /**
+ * Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific StorageGroup
+ *
+ * @param storageGroup StorageGroupName
+ * @param type SchemaRegion or DataRegion
+ * @return All SchemaRegionGroupIds when type is SchemaRegion, and all DataRegionGroupIds when
+ * type is DataRegion
+ */
+ public List<TConsensusGroupId> getRegionGroupIds(String storageGroup, TConsensusGroupType type) {
+ return storageGroupInfo.getRegionGroupIds(storageGroup, type);
+ }
+
+ /**
+ * Get all the StorageGroupSchema
+ *
+ * @return StorageGroupSchemaDataSet
+ */
+ public StorageGroupSchemaResp getStorageGroupSchema() {
ConsensusReadResponse readResponse =
- getConsensusManager().read(new QueryStorageGroupSchemaPlan());
- return (StorageGroupSchemaDataSet) readResponse.getDataset();
+ getConsensusManager().read(new QueryStorageGroupSchemaReq());
+ return (StorageGroupSchemaResp) readResponse.getDataset();
}
public List<String> getStorageGroupNames() {
- return regionInfoPersistence.getStorageGroupNames();
+ return storageGroupInfo.getStorageGroupNames();
+ }
+
+ private DataNodeManager getDataNodeInfoManager() {
+ return configManager.getDataNodeManager();
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 5ac486eee4..873f74c843 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -21,20 +21,20 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsDataSet;
-import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.PermissionInfoDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -53,24 +53,25 @@ public class ConfigManager implements Manager {
private static final TSStatus ERROR_TSSTATUS =
new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- /** manage consensus, write or read consensus */
+ /** Manage PartitionTable read/write requests through the ConsensusLayer */
private final ConsensusManager consensusManager;
- /** manage data node */
+ /** Manage cluster DataNode information */
private final DataNodeManager dataNodeManager;
- /** manage assign data partition and schema partition */
- private final PartitionManager partitionManager;
+ /** Manage cluster schema */
+ private final ClusterSchemaManager clusterSchemaManager;
- /** manager assign schema region and data region */
- private final RegionManager regionManager;
+ /** Manage cluster regions and partitions */
+ private final PartitionManager partitionManager;
+ /** Manage cluster authorization */
private final PermissionManager permissionManager;
public ConfigManager() throws IOException {
this.dataNodeManager = new DataNodeManager(this);
this.partitionManager = new PartitionManager(this);
- this.regionManager = new RegionManager(this);
+ this.clusterSchemaManager = new ClusterSchemaManager(this);
this.consensusManager = new ConsensusManager();
this.permissionManager = new PermissionManager(this);
}
@@ -85,24 +86,24 @@ public class ConfigManager implements Manager {
}
@Override
- public DataSet registerDataNode(PhysicalPlan physicalPlan) {
+ public DataSet registerDataNode(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return dataNodeManager.registerDataNode((RegisterDataNodePlan) physicalPlan);
+ return dataNodeManager.registerDataNode((RegisterDataNodeReq) configRequest);
} else {
- DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+ DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
dataSet.setStatus(status);
return dataSet;
}
}
@Override
- public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
+ public DataSet getDataNodeInfo(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) physicalPlan);
+ return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoReq) configRequest);
} else {
- DataNodeLocationsDataSet dataSet = new DataNodeLocationsDataSet();
+ DataNodeLocationsResp dataSet = new DataNodeLocationsResp();
dataSet.setStatus(status);
return dataSet;
}
@@ -112,19 +113,19 @@ public class ConfigManager implements Manager {
public DataSet getStorageGroupSchema() {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return regionManager.getStorageGroupSchema();
+ return clusterSchemaManager.getStorageGroupSchema();
} else {
- StorageGroupSchemaDataSet dataSet = new StorageGroupSchemaDataSet();
+ StorageGroupSchemaResp dataSet = new StorageGroupSchemaResp();
dataSet.setStatus(status);
return dataSet;
}
}
@Override
- public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
+ public TSStatus setStorageGroup(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
+ return clusterSchemaManager.setStorageGroup((SetStorageGroupReq) configRequest);
} else {
return status;
}
@@ -135,10 +136,10 @@ public class ConfigManager implements Manager {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> devicePaths = patternTree.findAllDevicePaths();
- List<String> storageGroups = getRegionManager().getStorageGroupNames();
+ List<String> storageGroups = getClusterSchemaManager().getStorageGroupNames();
- GetOrCreateSchemaPartitionPlan getSchemaPartitionPlan =
- new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
+ GetOrCreateSchemaPartitionReq getSchemaPartitionPlan =
+ new GetOrCreateSchemaPartitionReq(ConfigRequestType.GetSchemaPartition);
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
boolean getAll = false;
@@ -181,7 +182,7 @@ public class ConfigManager implements Manager {
getSchemaPartitionPlan.setPartitionSlotsMap(partitionSlotsMap);
return partitionManager.getSchemaPartition(getSchemaPartitionPlan);
} else {
- SchemaPartitionDataSet dataSet = new SchemaPartitionDataSet();
+ SchemaPartitionResp dataSet = new SchemaPartitionResp();
dataSet.setStatus(status);
return dataSet;
}
@@ -192,10 +193,10 @@ public class ConfigManager implements Manager {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> devicePaths = patternTree.findAllDevicePaths();
- List<String> storageGroups = getRegionManager().getStorageGroupNames();
+ List<String> storageGroups = getClusterSchemaManager().getStorageGroupNames();
- GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
- new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ GetOrCreateSchemaPartitionReq getOrCreateSchemaPartitionReq =
+ new GetOrCreateSchemaPartitionReq(ConfigRequestType.GetOrCreateSchemaPartition);
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
for (String devicePath : devicePaths) {
@@ -212,34 +213,34 @@ public class ConfigManager implements Manager {
}
}
- getOrCreateSchemaPartitionPlan.setPartitionSlotsMap(partitionSlotsMap);
- return partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
+ getOrCreateSchemaPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
+ return partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionReq);
} else {
- SchemaPartitionDataSet dataSet = new SchemaPartitionDataSet();
+ SchemaPartitionResp dataSet = new SchemaPartitionResp();
dataSet.setStatus(status);
return dataSet;
}
}
@Override
- public DataSet getDataPartition(PhysicalPlan physicalPlan) {
+ public DataSet getDataPartition(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return partitionManager.getDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+ return partitionManager.getDataPartition((GetOrCreateDataPartitionReq) configRequest);
} else {
- DataPartitionDataSet dataSet = new DataPartitionDataSet();
+ DataPartitionResp dataSet = new DataPartitionResp();
dataSet.setStatus(status);
return dataSet;
}
}
@Override
- public DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan) {
+ public DataSet getOrCreateDataPartition(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+ return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionReq) configRequest);
} else {
- DataPartitionDataSet dataSet = new DataPartitionDataSet();
+ DataPartitionResp dataSet = new DataPartitionResp();
dataSet.setStatus(status);
return dataSet;
}
@@ -261,8 +262,8 @@ public class ConfigManager implements Manager {
}
@Override
- public RegionManager getRegionManager() {
- return regionManager;
+ public ClusterSchemaManager getClusterSchemaManager() {
+ return clusterSchemaManager;
}
@Override
@@ -276,22 +277,22 @@ public class ConfigManager implements Manager {
}
@Override
- public TSStatus operatePermission(PhysicalPlan physicalPlan) {
+ public TSStatus operatePermission(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return permissionManager.operatePermission((AuthorPlan) physicalPlan);
+ return permissionManager.operatePermission((AuthorReq) configRequest);
} else {
return status;
}
}
@Override
- public DataSet queryPermission(PhysicalPlan physicalPlan) {
+ public DataSet queryPermission(ConfigRequest configRequest) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return permissionManager.queryPermission((AuthorPlan) physicalPlan);
+ return permissionManager.queryPermission((AuthorReq) configRequest);
} else {
- PermissionInfoDataSet dataSet = new PermissionInfoDataSet();
+ PermissionInfoResp dataSet = new PermissionInfoResp();
dataSet.setStatus(status);
return dataSet;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 0058416429..72584966bd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
@@ -88,12 +88,12 @@ public class ConsensusManager {
}
/** Transmit PhysicalPlan to confignode.consensus.statemachine */
- public ConsensusWriteResponse write(PhysicalPlan plan) {
+ public ConsensusWriteResponse write(ConfigRequest plan) {
return consensusImpl.write(consensusGroupId, plan);
}
/** Transmit PhysicalPlan to confignode.consensus.statemachine */
- public ConsensusReadResponse read(PhysicalPlan plan) {
+ public ConsensusReadResponse read(ConfigRequest plan) {
return consensusImpl.read(consensusGroupId, plan);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
index 6f28782cd1..17c150d064 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsDataSet;
-import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.persistence.DataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -42,8 +42,7 @@ public class DataNodeManager {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManager.class);
- private static final DataNodeInfoPersistence dataNodeInfoPersistence =
- DataNodeInfoPersistence.getInstance();
+ private static final DataNodeInfo dataNodeInfo = DataNodeInfo.getInstance();
private final Manager configManager;
@@ -54,7 +53,7 @@ public class DataNodeManager {
this.configManager = configManager;
}
- private void setGlobalConfig(DataNodeConfigurationDataSet dataSet) {
+ private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
// Set TGlobalConfig
TGlobalConfig globalConfig = new TGlobalConfig();
globalConfig.setDataNodeConsensusProtocolClass(
@@ -75,17 +74,16 @@ public class DataNodeManager {
* @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register
* success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
*/
- public DataSet registerDataNode(RegisterDataNodePlan plan) {
- DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+ public DataSet registerDataNode(RegisterDataNodeReq plan) {
+ DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
- if (DataNodeInfoPersistence.getInstance().containsValue(plan.getLocation())) {
+ if (DataNodeInfo.getInstance().containsValue(plan.getLocation())) {
TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
dataSet.setStatus(status);
} else {
// Persist DataNodeInfo
- plan.getLocation()
- .setDataNodeId(DataNodeInfoPersistence.getInstance().generateNextDataNodeId());
+ plan.getLocation().setDataNodeId(DataNodeInfo.getInstance().generateNextDataNodeId());
ConsensusWriteResponse resp = getConsensusManager().write(plan);
dataSet.setStatus(resp.getStatus());
}
@@ -102,16 +100,16 @@ public class DataNodeManager {
* @return The specific DataNode's info or all DataNode info if dataNodeId in
* QueryDataNodeInfoPlan is -1
*/
- public DataNodeLocationsDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
- return (DataNodeLocationsDataSet) getConsensusManager().read(plan).getDataset();
+ public DataNodeLocationsResp getDataNodeInfo(QueryDataNodeInfoReq plan) {
+ return (DataNodeLocationsResp) getConsensusManager().read(plan).getDataset();
}
public int getOnlineDataNodeCount() {
- return dataNodeInfoPersistence.getOnlineDataNodeCount();
+ return dataNodeInfo.getOnlineDataNodeCount();
}
public List<TDataNodeLocation> getOnlineDataNodes() {
- return dataNodeInfoPersistence.getOnlineDataNodes();
+ return dataNodeInfo.getOnlineDataNodes();
}
private ConsensusManager getConsensusManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index cae13c395e..539f29ff68 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
@@ -51,11 +51,11 @@ public interface Manager {
ConsensusManager getConsensusManager();
/**
- * Get RegionManager
+ * Get ClusterSchemaManager
*
- * @return RegionManager instance
+ * @return ClusterSchemaManager instance
*/
- RegionManager getRegionManager();
+ ClusterSchemaManager getClusterSchemaManager();
/**
* Get PartitionManager
@@ -67,18 +67,18 @@ public interface Manager {
/**
* Register DataNode
*
- * @param physicalPlan RegisterDataNodePlan
+ * @param configRequest RegisterDataNodePlan
* @return DataNodeConfigurationDataSet
*/
- DataSet registerDataNode(PhysicalPlan physicalPlan);
+ DataSet registerDataNode(ConfigRequest configRequest);
/**
* Get DataNode info
*
- * @param physicalPlan QueryDataNodeInfoPlan
+ * @param configRequest QueryDataNodeInfoPlan
* @return DataNodesInfoDataSet
*/
- DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
+ DataSet getDataNodeInfo(ConfigRequest configRequest);
/**
* Get StorageGroupSchemas
@@ -90,10 +90,10 @@ public interface Manager {
/**
* Set StorageGroup
*
- * @param physicalPlan SetStorageGroupPlan
+ * @param configRequest SetStorageGroupReq
* @return status
*/
- TSStatus setStorageGroup(PhysicalPlan physicalPlan);
+ TSStatus setStorageGroup(ConfigRequest configRequest);
/**
* Get SchemaPartition
@@ -112,32 +112,32 @@ public interface Manager {
/**
* Get DataPartition
*
- * @param physicalPlan DataPartitionPlan
+ * @param configRequest DataPartitionPlan
* @return DataPartitionDataSet
*/
- DataSet getDataPartition(PhysicalPlan physicalPlan);
+ DataSet getDataPartition(ConfigRequest configRequest);
/**
* Get or create DataPartition
*
- * @param physicalPlan DataPartitionPlan
+ * @param configRequest DataPartitionPlan
* @return DataPartitionDataSet
*/
- DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan);
+ DataSet getOrCreateDataPartition(ConfigRequest configRequest);
/**
* Operate Permission
*
- * @param physicalPlan AuthorPlan
+ * @param configRequest AuthorPlan
* @return status
*/
- TSStatus operatePermission(PhysicalPlan physicalPlan);
+ TSStatus operatePermission(ConfigRequest configRequest);
/**
* Query Permission
*
- * @param physicalPlan AuthorPlan
+ * @param configRequest AuthorPlan
* @return PermissionInfoDataSet
*/
- DataSet queryPermission(PhysicalPlan physicalPlan);
+ DataSet queryPermission(ConfigRequest configRequest);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 607e4bba0e..98aac95300 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -18,20 +18,21 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
-import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.StorageGroupInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -49,8 +50,8 @@ public class PartitionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
- private static final PartitionInfoPersistence partitionInfoPersistence =
- PartitionInfoPersistence.getInstance();
+ private static final StorageGroupInfo storageGroupInfo = StorageGroupInfo.getInstance();
+ private static final PartitionInfo partitionInfo = PartitionInfo.getInstance();
private final Manager configNodeManager;
@@ -71,11 +72,11 @@ public class PartitionManager {
* @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
- public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
- SchemaPartitionDataSet schemaPartitionDataSet;
+ public DataSet getSchemaPartition(GetOrCreateSchemaPartitionReq physicalPlan) {
+ SchemaPartitionResp schemaPartitionResp;
ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
- schemaPartitionDataSet = (SchemaPartitionDataSet) consensusReadResponse.getDataset();
- return schemaPartitionDataSet;
+ schemaPartitionResp = (SchemaPartitionResp) consensusReadResponse.getDataset();
+ return schemaPartitionResp;
}
/**
@@ -84,10 +85,9 @@ public class PartitionManager {
* @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet
*/
- public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
+ public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionReq physicalPlan) {
Map<String, List<TSeriesPartitionSlot>> noAssignedSchemaPartitionSlots =
- partitionInfoPersistence.filterNoAssignedSchemaPartitionSlots(
- physicalPlan.getPartitionSlotsMap());
+ partitionInfo.filterNoAssignedSchemaPartitionSlots(physicalPlan.getPartitionSlotsMap());
if (noAssignedSchemaPartitionSlots.size() > 0) {
// Allocate SchemaPartition
@@ -95,7 +95,7 @@ public class PartitionManager {
allocateSchemaPartition(noAssignedSchemaPartitionSlots);
// Persist SchemaPartition
- CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+ CreateSchemaPartitionReq createPlan = new CreateSchemaPartitionReq();
createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
getConsensusManager().write(createPlan);
}
@@ -116,8 +116,9 @@ public class PartitionManager {
for (String storageGroup : noAssignedSchemaPartitionSlotsMap.keySet()) {
List<TSeriesPartitionSlot> noAssignedPartitionSlots =
noAssignedSchemaPartitionSlotsMap.get(storageGroup);
- List<TRegionReplicaSet> schemaRegionEndPoints =
- RegionInfoPersistence.getInstance().getSchemaRegionEndPoint(storageGroup);
+ List<TRegionReplicaSet> schemaRegionReplicaSets =
+ partitionInfo.getRegionReplicaSets(
+ storageGroupInfo.getRegionGroupIds(storageGroup, TConsensusGroupType.SchemaRegion));
Random random = new Random();
Map<TSeriesPartitionSlot, TRegionReplicaSet> allocateResult = new HashMap<>();
@@ -125,7 +126,7 @@ public class PartitionManager {
seriesPartitionSlot ->
allocateResult.put(
seriesPartitionSlot,
- schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()))));
+ schemaRegionReplicaSets.get(random.nextInt(schemaRegionReplicaSets.size()))));
result.put(storageGroup, allocateResult);
}
@@ -140,11 +141,11 @@ public class PartitionManager {
* List<TimePartitionSlot>>>
* @return DataPartitionDataSet that contains only existing DataPartition
*/
- public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
- DataPartitionDataSet dataPartitionDataSet;
+ public DataSet getDataPartition(GetOrCreateDataPartitionReq physicalPlan) {
+ DataPartitionResp dataPartitionResp;
ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
- dataPartitionDataSet = (DataPartitionDataSet) consensusReadResponse.getDataset();
- return dataPartitionDataSet;
+ dataPartitionResp = (DataPartitionResp) consensusReadResponse.getDataset();
+ return dataPartitionResp;
}
/**
@@ -154,10 +155,9 @@ public class PartitionManager {
* List<TimePartitionSlot>>>
* @return DataPartitionDataSet
*/
- public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+ public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionReq physicalPlan) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> noAssignedDataPartitionSlots =
- partitionInfoPersistence.filterNoAssignedDataPartitionSlots(
- physicalPlan.getPartitionSlotsMap());
+ partitionInfo.filterNoAssignedDataPartitionSlots(physicalPlan.getPartitionSlotsMap());
if (noAssignedDataPartitionSlots.size() > 0) {
// Allocate DataPartition
@@ -165,7 +165,7 @@ public class PartitionManager {
assignedDataPartition = allocateDataPartition(noAssignedDataPartitionSlots);
// Persist DataPartition
- CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ CreateDataPartitionReq createPlan = new CreateDataPartitionReq();
createPlan.setAssignedDataPartition(assignedDataPartition);
getConsensusManager().write(createPlan);
}
@@ -193,7 +193,8 @@ public class PartitionManager {
Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> noAssignedPartitionSlotsMap =
noAssignedDataPartitionSlotsMap.get(storageGroup);
List<TRegionReplicaSet> dataRegionEndPoints =
- RegionInfoPersistence.getInstance().getDataRegionEndPoint(storageGroup);
+ partitionInfo.getRegionReplicaSets(
+ storageGroupInfo.getRegionGroupIds(storageGroup, TConsensusGroupType.DataRegion));
Random random = new Random();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> allocateResult =
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 5e8f37f55a..17d1a24709 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.consensus.response.PermissionInfoDataSet;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
public class PermissionManager {
@@ -31,12 +31,12 @@ public class PermissionManager {
this.configNodeManager = configManager;
}
- public TSStatus operatePermission(AuthorPlan authorPlan) {
- return getConsensusManager().write(authorPlan).getStatus();
+ public TSStatus operatePermission(AuthorReq authorReq) {
+ return getConsensusManager().write(authorReq).getStatus();
}
- public PermissionInfoDataSet queryPermission(AuthorPlan authorPlan) {
- return (PermissionInfoDataSet) getConsensusManager().read(authorPlan).getDataset();
+ public PermissionInfoResp queryPermission(AuthorReq authorReq) {
+ return (PermissionInfoResp) getConsensusManager().read(authorReq).getDataset();
}
private ConsensusManager getConsensusManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
similarity index 81%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfoPersistence.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index 77d4e2a926..93971e231a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.confignode.consensus.response.PermissionInfoDataSet;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -42,9 +42,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class AuthorInfoPersistence {
+public class AuthorInfo {
- private static final Logger logger = LoggerFactory.getLogger(AuthorInfoPersistence.class);
+ private static final Logger logger = LoggerFactory.getLogger(AuthorInfo.class);
private IAuthorizer authorizer;
@@ -56,14 +56,14 @@ public class AuthorInfoPersistence {
}
}
- public TSStatus authorNonQuery(AuthorPlan authorPlan) throws AuthException {
- PhysicalPlanType authorType = authorPlan.getAuthorType();
- String userName = authorPlan.getUserName();
- String roleName = authorPlan.getRoleName();
- String password = authorPlan.getPassword();
- String newPassword = authorPlan.getNewPassword();
- Set<Integer> permissions = authorPlan.getPermissions();
- String nodeName = authorPlan.getNodeName();
+ public TSStatus authorNonQuery(AuthorReq authorReq) throws AuthException {
+ ConfigRequestType authorType = authorReq.getAuthorType();
+ String userName = authorReq.getUserName();
+ String roleName = authorReq.getRoleName();
+ String password = authorReq.getPassword();
+ String newPassword = authorReq.getNewPassword();
+ Set<Integer> permissions = authorReq.getPermissions();
+ String nodeName = authorReq.getNodeName();
try {
switch (authorType) {
case UPDATE_USER:
@@ -108,16 +108,16 @@ public class AuthorInfoPersistence {
authorizer.revokeRoleFromUser(roleName, userName);
break;
default:
- throw new AuthException("execute " + authorPlan + " failed");
+ throw new AuthException("execute " + authorReq + " failed");
}
} catch (AuthException e) {
- throw new AuthException("execute " + authorPlan + " failed: ", e);
+ throw new AuthException("execute " + authorReq + " failed: ", e);
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public PermissionInfoDataSet executeListRole() throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListRole() throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
List<String> roleList = authorizer.listAllRoles();
Map<String, List<String>> permissionInfo = new HashMap<>();
permissionInfo.put(IoTDBConstant.COLUMN_ROLE, roleList);
@@ -126,8 +126,8 @@ public class AuthorInfoPersistence {
return result;
}
- public PermissionInfoDataSet executeListUser() throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListUser() throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
List<String> userList = authorizer.listAllUsers();
Map<String, List<String>> permissionInfo = new HashMap<>();
permissionInfo.put(IoTDBConstant.COLUMN_USER, userList);
@@ -136,8 +136,8 @@ public class AuthorInfoPersistence {
return result;
}
- public PermissionInfoDataSet executeListRoleUsers(AuthorPlan plan) throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListRoleUsers(AuthorReq plan) throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
Role role = authorizer.getRole(plan.getRoleName());
if (role == null) {
throw new AuthException("No such role : " + plan.getRoleName());
@@ -157,8 +157,8 @@ public class AuthorInfoPersistence {
return result;
}
- public PermissionInfoDataSet executeListUserRoles(AuthorPlan plan) throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListUserRoles(AuthorReq plan) throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
User user = authorizer.getUser(plan.getUserName());
if (user == null) {
throw new AuthException("No such user : " + plan.getUserName());
@@ -174,8 +174,8 @@ public class AuthorInfoPersistence {
return result;
}
- public PermissionInfoDataSet executeListRolePrivileges(AuthorPlan plan) throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListRolePrivileges(AuthorReq plan) throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
Role role = authorizer.getRole(plan.getRoleName());
if (role == null) {
throw new AuthException("No such role : " + plan.getRoleName());
@@ -194,8 +194,8 @@ public class AuthorInfoPersistence {
return result;
}
- public PermissionInfoDataSet executeListUserPrivileges(AuthorPlan plan) throws AuthException {
- PermissionInfoDataSet result = new PermissionInfoDataSet();
+ public PermissionInfoResp executeListUserPrivileges(AuthorReq plan) throws AuthException {
+ PermissionInfoResp result = new PermissionInfoResp();
User user = authorizer.getUser(plan.getUserName());
if (user == null) {
throw new AuthException("No such user : " + plan.getUserName());
@@ -242,14 +242,14 @@ public class AuthorInfoPersistence {
private static class AuthorInfoPersistenceHolder {
- private static final AuthorInfoPersistence INSTANCE = new AuthorInfoPersistence();
+ private static final AuthorInfo INSTANCE = new AuthorInfo();
private AuthorInfoPersistenceHolder() {
// empty constructor
}
}
- public static AuthorInfoPersistence getInstance() {
- return AuthorInfoPersistence.AuthorInfoPersistenceHolder.INSTANCE;
+ public static AuthorInfo getInstance() {
+ return AuthorInfo.AuthorInfoPersistenceHolder.INSTANCE;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfo.java
similarity index 81%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfo.java
index d09b3dc740..3b71d43e5b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfo.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsDataSet;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
@@ -35,14 +35,15 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class DataNodeInfoPersistence {
+public class DataNodeInfo {
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
// TODO: serialize and deserialize
- private int nextDataNodeId = 0;
+ private AtomicInteger nextDataNodeId = new AtomicInteger(0);
/** online data nodes */
// TODO: serialize and deserialize
@@ -52,7 +53,7 @@ public class DataNodeInfoPersistence {
/** For remove node or draining node */
private final Set<TDataNodeLocation> drainingDataNodes = new HashSet<>();
- private DataNodeInfoPersistence() {
+ private DataNodeInfo() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
}
@@ -85,13 +86,17 @@ public class DataNodeInfoPersistence {
* @param plan RegisterDataNodePlan
* @return SUCCESS_STATUS
*/
- public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+ public TSStatus registerDataNode(RegisterDataNodeReq plan) {
TSStatus result;
TDataNodeLocation info = plan.getLocation();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- nextDataNodeId = Math.max(nextDataNodeId, info.getDataNodeId());
onlineDataNodes.put(info.getDataNodeId(), info);
+ if (nextDataNodeId.get() < plan.getLocation().getDataNodeId()) {
+ // In this case, at least one Datanode is registered with the leader node,
+ // so the nextDataNodeID of the followers needs to be added
+ nextDataNodeId.getAndIncrement();
+ }
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
@@ -106,8 +111,8 @@ public class DataNodeInfoPersistence {
* @return The specific DataNode's info or all DataNode info if dataNodeId in
* QueryDataNodeInfoPlan is -1
*/
- public DataNodeLocationsDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
- DataNodeLocationsDataSet result = new DataNodeLocationsDataSet();
+ public DataNodeLocationsResp getDataNodeInfo(QueryDataNodeInfoReq plan) {
+ DataNodeLocationsResp result = new DataNodeLocationsResp();
result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
int dataNodeId = plan.getDataNodeID();
@@ -161,36 +166,26 @@ public class DataNodeInfoPersistence {
}
public int generateNextDataNodeId() {
- int result;
-
- try {
- dataNodeInfoReadWriteLock.writeLock().lock();
- result = nextDataNodeId;
- nextDataNodeId += 1;
- } finally {
- dataNodeInfoReadWriteLock.writeLock().unlock();
- }
-
- return result;
+ return nextDataNodeId.getAndIncrement();
}
@TestOnly
public void clear() {
- nextDataNodeId = 0;
+ nextDataNodeId = new AtomicInteger(0);
onlineDataNodes.clear();
drainingDataNodes.clear();
}
private static class DataNodeInfoPersistenceHolder {
- private static final DataNodeInfoPersistence INSTANCE = new DataNodeInfoPersistence();
+ private static final DataNodeInfo INSTANCE = new DataNodeInfo();
private DataNodeInfoPersistenceHolder() {
// empty constructor
}
}
- public static DataNodeInfoPersistence getInstance() {
- return DataNodeInfoPersistence.DataNodeInfoPersistenceHolder.INSTANCE;
+ public static DataNodeInfo getInstance() {
+ return DataNodeInfo.DataNodeInfoPersistenceHolder.INSTANCE;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
similarity index 63%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index 348c21efa8..5e21661eca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -27,27 +28,37 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** manage data partition and schema partition */
-public class PartitionInfoPersistence {
+public class PartitionInfo {
- /** schema partition read write lock */
+ // TODO: Serialize and Deserialize
+ private AtomicInteger nextRegionGroupId = new AtomicInteger(0);
+
+ // Region read write lock
+ private final ReentrantReadWriteLock regionReadWriteLock;
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionMap;
+
+ // schema partition read write lock
private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
- /** data partition read write lock */
+ // data partition read write lock
private final ReentrantReadWriteLock dataPartitionReadWriteLock;
// TODO: Serialize and Deserialize
@@ -56,14 +67,18 @@ public class PartitionInfoPersistence {
// TODO: Serialize and Deserialize
private final DataPartition dataPartition;
- public PartitionInfoPersistence() {
+ private PartitionInfo() {
+ this.regionReadWriteLock = new ReentrantReadWriteLock();
+ this.regionMap = new HashMap<>();
+
this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
- this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
this.schemaPartition =
new SchemaPartition(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
+
+ this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
this.dataPartition =
new DataPartition(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
@@ -71,25 +86,59 @@ public class PartitionInfoPersistence {
this.dataPartition.setDataPartitionMap(new HashMap<>());
}
+ public int generateNextRegionGroupId() {
+ return nextRegionGroupId.getAndIncrement();
+ }
+
+ /**
+ * Persistence allocation result of new Regions
+ *
+ * @param plan CreateRegionsPlan
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus createRegions(CreateRegionsReq plan) {
+ TSStatus result;
+ regionReadWriteLock.writeLock().lock();
+ try {
+ int maxRegionId = Integer.MIN_VALUE;
+
+ for (TRegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
+ regionMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ maxRegionId = Math.max(maxRegionId, regionReplicaSet.getRegionId().getId());
+ }
+
+ if (nextRegionGroupId.get() < maxRegionId) {
+ // In this case, at least one Region is created with the leader node,
+ // so the nextRegionGroupID of the followers needs to be added
+ nextRegionGroupId.getAndAdd(plan.getRegionReplicaSets().size());
+ }
+
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } finally {
+ regionReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
/**
* Get SchemaPartition
*
* @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
- public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
- SchemaPartitionDataSet schemaPartitionDataSet = new SchemaPartitionDataSet();
+ public DataSet getSchemaPartition(GetOrCreateSchemaPartitionReq physicalPlan) {
+ SchemaPartitionResp schemaPartitionResp = new SchemaPartitionResp();
schemaPartitionReadWriteLock.readLock().lock();
try {
- schemaPartitionDataSet.setSchemaPartition(
+ schemaPartitionResp.setSchemaPartition(
schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
- schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ schemaPartitionResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- return schemaPartitionDataSet;
+ return schemaPartitionResp;
}
/**
@@ -98,7 +147,7 @@ public class PartitionInfoPersistence {
* @param physicalPlan CreateSchemaPartitionPlan with SchemaPartition assigned result
* @return TSStatusCode.SUCCESS_STATUS when creation successful
*/
- public TSStatus createSchemaPartition(CreateSchemaPartitionPlan physicalPlan) {
+ public TSStatus createSchemaPartition(CreateSchemaPartitionReq physicalPlan) {
schemaPartitionReadWriteLock.writeLock().lock();
try {
@@ -118,6 +167,13 @@ public class PartitionInfoPersistence {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ /**
+ * Filter no assigned SchemaPartitionSlots
+ *
+ * @param partitionSlotsMap Map<StorageGroupName, List<TSeriesPartitionSlot>>
+ * @return Map<StorageGroupName, List<TSeriesPartitionSlot>>, SchemaPartitionSlots that is not
+ * assigned in partitionSlotsMap
+ */
public Map<String, List<TSeriesPartitionSlot>> filterNoAssignedSchemaPartitionSlots(
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
Map<String, List<TSeriesPartitionSlot>> result;
@@ -136,22 +192,22 @@ public class PartitionInfoPersistence {
* @param physicalPlan DataPartitionPlan with partitionSlotsMap
* @return DataPartitionDataSet that contains only existing DataPartition
*/
- public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
- DataPartitionDataSet dataPartitionDataSet = new DataPartitionDataSet();
+ public DataSet getDataPartition(GetOrCreateDataPartitionReq physicalPlan) {
+ DataPartitionResp dataPartitionResp = new DataPartitionResp();
dataPartitionReadWriteLock.readLock().lock();
try {
- dataPartitionDataSet.setDataPartition(
+ dataPartitionResp.setDataPartition(
dataPartition.getDataPartition(
physicalPlan.getPartitionSlotsMap(),
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum()));
} finally {
dataPartitionReadWriteLock.readLock().unlock();
- dataPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ dataPartitionResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- return dataPartitionDataSet;
+ return dataPartitionResp;
}
/**
@@ -160,7 +216,7 @@ public class PartitionInfoPersistence {
* @param physicalPlan CreateDataPartitionPlan with DataPartition assigned result
* @return TSStatusCode.SUCCESS_STATUS when creation successful
*/
- public TSStatus createDataPartition(CreateDataPartitionPlan physicalPlan) {
+ public TSStatus createDataPartition(CreateDataPartitionReq physicalPlan) {
dataPartitionReadWriteLock.writeLock().lock();
try {
@@ -187,6 +243,14 @@ public class PartitionInfoPersistence {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ /**
+ * Filter no assigned DataPartitionSlots
+ *
+ * @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot,
+ * List<TTimePartitionSlot>>>
+ * @return Map<StorageGroupName, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>,
+ * DataPartitionSlots that is not assigned in partitionSlotsMap
+ */
public Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
filterNoAssignedDataPartitionSlots(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap) {
@@ -200,8 +264,25 @@ public class PartitionInfoPersistence {
return result;
}
+ /** Get RegionReplicaSet by the specific TConsensusGroupIds */
+ public List<TRegionReplicaSet> getRegionReplicaSets(List<TConsensusGroupId> groupIds) {
+ List<TRegionReplicaSet> result = new ArrayList<>();
+ regionReadWriteLock.readLock().lock();
+ try {
+ for (TConsensusGroupId groupId : groupIds) {
+ result.add(regionMap.get(groupId));
+ }
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
@TestOnly
public void clear() {
+ nextRegionGroupId = new AtomicInteger(0);
+ regionMap.clear();
+
if (schemaPartition.getSchemaPartitionMap() != null) {
schemaPartition.getSchemaPartitionMap().clear();
}
@@ -211,16 +292,16 @@ public class PartitionInfoPersistence {
}
}
- private static class PartitionInfoPersistenceHolder {
+ private static class PartitionInfoHolder {
- private static final PartitionInfoPersistence INSTANCE = new PartitionInfoPersistence();
+ private static final PartitionInfo INSTANCE = new PartitionInfo();
- private PartitionInfoPersistenceHolder() {
+ private PartitionInfoHolder() {
// empty constructor
}
}
- public static PartitionInfoPersistence getInstance() {
- return PartitionInfoPersistence.PartitionInfoPersistenceHolder.INSTANCE;
+ public static PartitionInfo getInstance() {
+ return PartitionInfoHolder.INSTANCE;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
deleted file mode 100644
index 0e426d44a8..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.persistence;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/** manage data partition and schema partition */
-public class RegionInfoPersistence {
-
- // TODO: Serialize and Deserialize
- // Map<StorageGroupName, StorageGroupSchema>
- private final Map<String, TStorageGroupSchema> storageGroupsMap;
-
- // Region allocate lock
- private final ReentrantReadWriteLock regionAllocateLock;
- // TODO: Serialize and Deserialize
- private int nextRegionGroupId = 0;
-
- // Region read write lock
- private final ReentrantReadWriteLock regionReadWriteLock;
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionMap;
-
- public RegionInfoPersistence() {
- this.regionAllocateLock = new ReentrantReadWriteLock();
- this.regionReadWriteLock = new ReentrantReadWriteLock();
- this.storageGroupsMap = new HashMap<>();
- this.regionMap = new HashMap<>();
- }
-
- /**
- * Persistence new StorageGroupSchema
- *
- * @param plan SetStorageGroupPlan
- * @return SUCCESS_STATUS
- */
- public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
- TSStatus result;
- regionReadWriteLock.writeLock().lock();
- try {
- TStorageGroupSchema storageGroupSchema = plan.getSchema();
- storageGroupsMap.put(storageGroupSchema.getName(), storageGroupSchema);
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } finally {
- regionReadWriteLock.writeLock().unlock();
- }
- return result;
- }
-
- public StorageGroupSchemaDataSet getStorageGroupSchema() {
- StorageGroupSchemaDataSet result = new StorageGroupSchemaDataSet();
- regionReadWriteLock.readLock().lock();
- try {
- result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
- } finally {
- regionReadWriteLock.readLock().unlock();
- result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- }
- return result;
- }
-
- /**
- * Persistence allocation result of new Regions
- *
- * @param plan CreateRegionsPlan
- * @return SUCCESS_STATUS
- */
- public TSStatus createRegions(CreateRegionsPlan plan) {
- TSStatus result;
- regionReadWriteLock.writeLock().lock();
- regionAllocateLock.writeLock().lock();
- try {
- TStorageGroupSchema storageGroupSchema = storageGroupsMap.get(plan.getStorageGroup());
-
- for (TRegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
- nextRegionGroupId = Math.max(nextRegionGroupId, regionReplicaSet.getRegionId().getId());
- regionMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- storageGroupSchema.getSchemaRegionGroupIds().add(regionReplicaSet.getRegionId());
- break;
- case DataRegion:
- storageGroupSchema.getDataRegionGroupIds().add(regionReplicaSet.getRegionId());
- }
- }
-
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } finally {
- regionAllocateLock.writeLock().unlock();
- regionReadWriteLock.writeLock().unlock();
- }
- return result;
- }
-
- /** @return The SchemaRegion ReplicaSets in the specific StorageGroup */
- public List<TRegionReplicaSet> getSchemaRegionEndPoint(String storageGroup) {
- List<TRegionReplicaSet> schemaRegionEndPoints = new ArrayList<>();
- regionReadWriteLock.readLock().lock();
- try {
- if (storageGroupsMap.containsKey(storageGroup)) {
- List<TConsensusGroupId> schemaRegionIds =
- storageGroupsMap.get(storageGroup).getSchemaRegionGroupIds();
- for (TConsensusGroupId consensusGroupId : schemaRegionIds) {
- schemaRegionEndPoints.add(regionMap.get(consensusGroupId));
- }
- }
- } finally {
- regionReadWriteLock.readLock().unlock();
- }
-
- return schemaRegionEndPoints;
- }
-
- /** @return The DataRegion ReplicaSets in the specific StorageGroup */
- public List<TRegionReplicaSet> getDataRegionEndPoint(String storageGroup) {
- List<TRegionReplicaSet> dataRegionEndPoints = new ArrayList<>();
- regionReadWriteLock.readLock().lock();
- try {
- if (storageGroupsMap.containsKey(storageGroup)) {
- List<TConsensusGroupId> dataRegionIds =
- storageGroupsMap.get(storageGroup).getDataRegionGroupIds();
- for (TConsensusGroupId consensusGroupId : dataRegionIds) {
- dataRegionEndPoints.add(regionMap.get(consensusGroupId));
- }
- }
- } finally {
- regionReadWriteLock.readLock().unlock();
- }
-
- return dataRegionEndPoints;
- }
-
- /**
- * Get all StorageGroups' name
- *
- * @return List<String>, all storageGroups' name
- */
- public List<String> getStorageGroupNames() {
- List<String> storageGroups;
- regionReadWriteLock.readLock().lock();
- try {
- storageGroups = new ArrayList<>(storageGroupsMap.keySet());
- } finally {
- regionReadWriteLock.readLock().unlock();
- }
- return storageGroups;
- }
-
- public int generateNextRegionGroupId() {
- int result;
- regionAllocateLock.writeLock().lock();
- try {
- result = nextRegionGroupId;
- nextRegionGroupId += 1;
- } finally {
- regionAllocateLock.writeLock().unlock();
- }
- return result;
- }
-
- public boolean containsStorageGroup(String storageName) {
- boolean result;
- regionReadWriteLock.readLock().lock();
- try {
- result = storageGroupsMap.containsKey(storageName);
- } finally {
- regionReadWriteLock.readLock().unlock();
- }
- return result;
- }
-
- @TestOnly
- public void clear() {
- nextRegionGroupId = 0;
- storageGroupsMap.clear();
- regionMap.clear();
- }
-
- private static class RegionInfoPersistenceHolder {
-
- private static final RegionInfoPersistence INSTANCE = new RegionInfoPersistence();
-
- private RegionInfoPersistenceHolder() {
- // empty constructor
- }
- }
-
- public static RegionInfoPersistence getInstance() {
- return RegionInfoPersistence.RegionInfoPersistenceHolder.INSTANCE;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/StorageGroupInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/StorageGroupInfo.java
new file mode 100644
index 0000000000..c904761a41
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/StorageGroupInfo.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StorageGroupInfo {
+
+ // StorageGroup read write lock
+ private final ReentrantReadWriteLock storageGroupReadWriteLock;
+ // TODO: Serialize and Deserialize
+ // Map<StorageGroupName, StorageGroupSchema>
+ private final Map<String, TStorageGroupSchema> storageGroupsMap;
+
+ private StorageGroupInfo() {
+ storageGroupReadWriteLock = new ReentrantReadWriteLock();
+ storageGroupsMap = new HashMap<>();
+ }
+
+ /**
+ * Persistence new StorageGroupSchema
+ *
+ * @param plan SetStorageGroupPlan
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus setStorageGroup(SetStorageGroupReq plan) {
+ TSStatus result;
+ storageGroupReadWriteLock.writeLock().lock();
+ try {
+ TStorageGroupSchema storageGroupSchema = plan.getSchema();
+ storageGroupsMap.put(storageGroupSchema.getName(), storageGroupSchema);
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } finally {
+ storageGroupReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
+ /** @return List<StorageGroupName>, all storageGroups' name */
+ public List<String> getStorageGroupNames() {
+ List<String> storageGroups;
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ storageGroups = new ArrayList<>(storageGroupsMap.keySet());
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return storageGroups;
+ }
+
+ /** @return All the StorageGroupSchema */
+ public StorageGroupSchemaResp getStorageGroupSchema() {
+ StorageGroupSchemaResp result = new StorageGroupSchemaResp();
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+ return result;
+ }
+
+ /** @return True if StorageGroupInfo contains the specific StorageGroup */
+ public boolean containsStorageGroup(String storageName) {
+ boolean result;
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ result = storageGroupsMap.containsKey(storageName);
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /**
+ * Get the SchemaRegionGroupIds or DataRegionGroupIds from the specific StorageGroup
+ *
+ * @param storageGroup StorageGroupName
+ * @param type SchemaRegion or DataRegion
+ * @return All SchemaRegionGroupIds when type is SchemaRegion, and all DataRegionGroupIds when
+ * type is DataRegion
+ */
+ public List<TConsensusGroupId> getRegionGroupIds(String storageGroup, TConsensusGroupType type) {
+ List<TConsensusGroupId> result;
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ switch (type) {
+ case SchemaRegion:
+ result = storageGroupsMap.get(storageGroup).getSchemaRegionGroupIds();
+ break;
+ case DataRegion:
+ result = storageGroupsMap.get(storageGroup).getDataRegionGroupIds();
+ break;
+ default:
+ result = new ArrayList<>();
+ }
+ } finally {
+ storageGroupReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ @TestOnly
+ public void clear() {
+ storageGroupsMap.clear();
+ }
+
+ private static class StorageGroupInfoHolder {
+
+ private static final StorageGroupInfo INSTANCE = new StorageGroupInfo();
+
+ private StorageGroupInfoHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static StorageGroupInfo getInstance() {
+ return StorageGroupInfoHolder.INSTANCE;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 418ff9a47e..ced863471f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -24,8 +24,9 @@ import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
-import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServer;
-import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServerProcessor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
+import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +43,20 @@ public class ConfigNode implements ConfigNodeMBean {
private final RegisterManager registerManager = new RegisterManager();
+ private final ConfigNodeRPCService configNodeRPCService;
+
+ private ConfigManager configManager;
+
private ConfigNode() {
- // empty constructor
+ this.configNodeRPCService = new ConfigNodeRPCService();
+
+ try {
+ this.configManager = new ConfigManager();
+ } catch (IOException e) {
+ LOGGER.error("Can't start ConfigNode consensus group!", e);
+ stop();
+ System.exit(0);
+ }
}
public static void main(String[] args) {
@@ -53,11 +66,11 @@ public class ConfigNode implements ConfigNodeMBean {
/** Register services */
private void setUp() throws StartupException, IOException {
LOGGER.info("Setting up {}...", ConfigNodeConstant.GLOBAL_NAME);
- registerManager.register(JMXService.getInstance());
+ registerManager.register(new JMXService());
JMXService.registerMBean(getInstance(), mbeanName);
- ConfigNodeRPCServer.getInstance().initSyncedServiceImpl(new ConfigNodeRPCServerProcessor());
- registerManager.register(ConfigNodeRPCServer.getInstance());
+ configNodeRPCService.initSyncedServiceImpl(new ConfigNodeRPCServiceProcessor(configManager));
+ registerManager.register(configNodeRPCService);
LOGGER.info("Init rpc server success");
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index 8e5a34d47c..59220c4c67 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -19,84 +19,84 @@
package org.apache.iotdb.confignode.service.executor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
-import org.apache.iotdb.confignode.persistence.AuthorInfoPersistence;
-import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
-import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
-import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.persistence.AuthorInfo;
+import org.apache.iotdb.confignode.persistence.DataNodeInfo;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.StorageGroupInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.auth.AuthException;
public class PlanExecutor {
- private final DataNodeInfoPersistence dataNodeInfoPersistence;
+ private final DataNodeInfo dataNodeInfo;
- private final RegionInfoPersistence regionInfoPersistence;
+ private final StorageGroupInfo storageGroupInfo;
- private final PartitionInfoPersistence partitionInfoPersistence;
+ private final PartitionInfo partitionInfo;
- private final AuthorInfoPersistence authorInfoPersistence;
+ private final AuthorInfo authorInfo;
public PlanExecutor() {
- this.dataNodeInfoPersistence = DataNodeInfoPersistence.getInstance();
- this.regionInfoPersistence = RegionInfoPersistence.getInstance();
- this.partitionInfoPersistence = PartitionInfoPersistence.getInstance();
- this.authorInfoPersistence = AuthorInfoPersistence.getInstance();
+ this.dataNodeInfo = DataNodeInfo.getInstance();
+ this.storageGroupInfo = StorageGroupInfo.getInstance();
+ this.partitionInfo = PartitionInfo.getInstance();
+ this.authorInfo = AuthorInfo.getInstance();
}
- public DataSet executorQueryPlan(PhysicalPlan plan)
+ public DataSet executorQueryPlan(ConfigRequest plan)
throws UnknownPhysicalPlanTypeException, AuthException {
switch (plan.getType()) {
case QueryDataNodeInfo:
- return dataNodeInfoPersistence.getDataNodeInfo((QueryDataNodeInfoPlan) plan);
+ return dataNodeInfo.getDataNodeInfo((QueryDataNodeInfoReq) plan);
case QueryStorageGroupSchema:
- return regionInfoPersistence.getStorageGroupSchema();
+ return storageGroupInfo.getStorageGroupSchema();
case GetDataPartition:
case GetOrCreateDataPartition:
- return partitionInfoPersistence.getDataPartition((GetOrCreateDataPartitionPlan) plan);
+ return partitionInfo.getDataPartition((GetOrCreateDataPartitionReq) plan);
case GetSchemaPartition:
case GetOrCreateSchemaPartition:
- return partitionInfoPersistence.getSchemaPartition((GetOrCreateSchemaPartitionPlan) plan);
+ return partitionInfo.getSchemaPartition((GetOrCreateSchemaPartitionReq) plan);
case LIST_USER:
- return authorInfoPersistence.executeListUser();
+ return authorInfo.executeListUser();
case LIST_ROLE:
- return authorInfoPersistence.executeListRole();
+ return authorInfo.executeListRole();
case LIST_USER_PRIVILEGE:
- return authorInfoPersistence.executeListUserPrivileges((AuthorPlan) plan);
+ return authorInfo.executeListUserPrivileges((AuthorReq) plan);
case LIST_ROLE_PRIVILEGE:
- return authorInfoPersistence.executeListRolePrivileges((AuthorPlan) plan);
+ return authorInfo.executeListRolePrivileges((AuthorReq) plan);
case LIST_USER_ROLES:
- return authorInfoPersistence.executeListUserRoles((AuthorPlan) plan);
+ return authorInfo.executeListUserRoles((AuthorReq) plan);
case LIST_ROLE_USERS:
- return authorInfoPersistence.executeListRoleUsers((AuthorPlan) plan);
+ return authorInfo.executeListRoleUsers((AuthorReq) plan);
default:
throw new UnknownPhysicalPlanTypeException(plan.getType());
}
}
- public TSStatus executorNonQueryPlan(PhysicalPlan plan)
+ public TSStatus executorNonQueryPlan(ConfigRequest plan)
throws UnknownPhysicalPlanTypeException, AuthException {
switch (plan.getType()) {
case RegisterDataNode:
- return dataNodeInfoPersistence.registerDataNode((RegisterDataNodePlan) plan);
+ return dataNodeInfo.registerDataNode((RegisterDataNodeReq) plan);
case SetStorageGroup:
- return regionInfoPersistence.setStorageGroup((SetStorageGroupPlan) plan);
+ return storageGroupInfo.setStorageGroup((SetStorageGroupReq) plan);
case CreateRegions:
- return regionInfoPersistence.createRegions((CreateRegionsPlan) plan);
+ return partitionInfo.createRegions((CreateRegionsReq) plan);
case CreateSchemaPartition:
- return partitionInfoPersistence.createSchemaPartition((CreateSchemaPartitionPlan) plan);
+ return partitionInfo.createSchemaPartition((CreateSchemaPartitionReq) plan);
case CreateDataPartition:
- return partitionInfoPersistence.createDataPartition((CreateDataPartitionPlan) plan);
+ return partitionInfo.createDataPartition((CreateDataPartitionReq) plan);
case CREATE_USER:
case CREATE_ROLE:
case DROP_USER:
@@ -108,7 +108,7 @@ public class PlanExecutor {
case REVOKE_ROLE:
case REVOKE_ROLE_FROM_USER:
case UPDATE_USER:
- return authorInfoPersistence.authorNonQuery((AuthorPlan) plan);
+ return authorInfo.authorNonQuery((AuthorReq) plan);
default:
throw new UnknownPhysicalPlanTypeException(plan.getType());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
similarity index 76%
rename from confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index 2c3542b5df..90016df031 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.service.thrift.server;
+package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -29,19 +29,19 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
-public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCServerMBean {
+public class ConfigNodeRPCService extends ThriftService implements ConfigNodeRPCServiceMBean {
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
- private ConfigNodeRPCServerProcessor configNodeRPCServerProcessor;
+ private ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor;
- private ConfigNodeRPCServer() {
- // empty constructor
+ public ConfigNodeRPCService() {
+ // Empty constructor
}
@Override
public ThriftService getImplementation() {
- return ConfigNodeRPCServer.getInstance();
+ return this;
}
@Override
@@ -51,17 +51,18 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
@Override
public void initSyncedServiceImpl(Object configNodeRPCServerProcessor) {
- this.configNodeRPCServerProcessor = (ConfigNodeRPCServerProcessor) configNodeRPCServerProcessor;
+ this.configNodeRPCServiceProcessor =
+ (ConfigNodeRPCServiceProcessor) configNodeRPCServerProcessor;
super.mbeanName =
String.format(
"%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, getID().getJmxName());
- super.initSyncedServiceImpl(this.configNodeRPCServerProcessor);
+ super.initSyncedServiceImpl(this.configNodeRPCServiceProcessor);
}
@Override
public void initTProcessor() throws InstantiationException {
- processor = new ConfigIService.Processor<>(configNodeRPCServerProcessor);
+ processor = new ConfigIService.Processor<>(configNodeRPCServiceProcessor);
}
@Override
@@ -77,7 +78,7 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
getBindPort(),
conf.getRpcMaxConcurrentClientNum(),
conf.getThriftServerAwaitTimeForStopService(),
- new ConfigNodeRPCServiceHandler(configNodeRPCServerProcessor),
+ new ConfigNodeRPCServiceHandler(configNodeRPCServiceProcessor),
conf.isRpcThriftCompressionEnabled());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
@@ -94,17 +95,4 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
public int getBindPort() {
return conf.getRpcPort();
}
-
- public static ConfigNodeRPCServer getInstance() {
- return ConfigNodeRPCServerHolder.INSTANCE;
- }
-
- private static class ConfigNodeRPCServerHolder {
-
- private static final ConfigNodeRPCServer INSTANCE = new ConfigNodeRPCServer();
-
- private ConfigNodeRPCServerHolder() {
- // empty constructor
- }
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
similarity index 89%
rename from confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
index 638885022a..ea81ce222f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
@@ -14,7 +14,7 @@
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.confignode.service.thrift.server;
+package org.apache.iotdb.confignode.service.thrift;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
@@ -22,9 +22,9 @@ import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
public class ConfigNodeRPCServiceHandler implements TServerEventHandler {
- private final ConfigNodeRPCServerProcessor processor;
+ private final ConfigNodeRPCServiceProcessor processor;
- public ConfigNodeRPCServiceHandler(ConfigNodeRPCServerProcessor processor) {
+ public ConfigNodeRPCServiceHandler(ConfigNodeRPCServiceProcessor processor) {
this.processor = processor;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerMBean.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMBean.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerMBean.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMBean.java
index 5b9778048a..899009eff3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerMBean.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceMBean.java
@@ -17,6 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.confignode.service.thrift.server;
+package org.apache.iotdb.confignode.service.thrift;
-public interface ConfigNodeRPCServerMBean {}
+public interface ConfigNodeRPCServiceMBean {}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
similarity index 70%
rename from confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 99c1de0334..be2d6031ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.service.thrift.server;
+package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsDataSet;
-import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.PermissionInfoDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
@@ -62,14 +62,14 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
-public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
+public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeRPCServerProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeRPCServiceProcessor.class);
private final ConfigManager configManager;
- public ConfigNodeRPCServerProcessor() throws IOException {
- this.configManager = new ConfigManager();
+ public ConfigNodeRPCServiceProcessor(ConfigManager configManager) throws IOException {
+ this.configManager = configManager;
}
public void close() throws IOException {
@@ -78,24 +78,24 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
- RegisterDataNodePlan plan = new RegisterDataNodePlan(req.getDataNodeLocation());
- DataNodeConfigurationDataSet dataSet =
- (DataNodeConfigurationDataSet) configManager.registerDataNode(plan);
+ RegisterDataNodeReq registerReq = new RegisterDataNodeReq(req.getDataNodeLocation());
+ DataNodeConfigurationResp registerResp =
+ (DataNodeConfigurationResp) configManager.registerDataNode(registerReq);
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
- dataSet.convertToRpcDataNodeRegisterResp(resp);
+ registerResp.convertToRpcDataNodeRegisterResp(resp);
LOGGER.info("Execute RegisterDatanodeRequest {} with result {}", resp, req);
return resp;
}
@Override
public TDataNodeLocationResp getDataNodeLocations(int dataNodeID) throws TException {
- QueryDataNodeInfoPlan plan = new QueryDataNodeInfoPlan(dataNodeID);
- DataNodeLocationsDataSet dataSet =
- (DataNodeLocationsDataSet) configManager.getDataNodeInfo(plan);
+ QueryDataNodeInfoReq queryReq = new QueryDataNodeInfoReq(dataNodeID);
+ DataNodeLocationsResp queryResp =
+ (DataNodeLocationsResp) configManager.getDataNodeInfo(queryReq);
TDataNodeLocationResp resp = new TDataNodeLocationResp();
- dataSet.convertToRpcDataNodeLocationResp(resp);
+ queryResp.convertToRpcDataNodeLocationResp(resp);
return resp;
}
@@ -114,8 +114,8 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());
storageGroupSchema.setDataRegionGroupIds(new ArrayList<>());
- SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroupSchema);
- return configManager.setStorageGroup(plan);
+ SetStorageGroupReq setReq = new SetStorageGroupReq(storageGroupSchema);
+ return configManager.setStorageGroup(setReq);
}
@Override
@@ -138,11 +138,11 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
@Override
public TStorageGroupSchemaResp getStorageGroupsSchema() throws TException {
- StorageGroupSchemaDataSet dataSet =
- (StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
+ StorageGroupSchemaResp storageGroupSchemaResp =
+ (StorageGroupSchemaResp) configManager.getStorageGroupSchema();
TStorageGroupSchemaResp resp = new TStorageGroupSchemaResp();
- dataSet.convertToRPCStorageGroupSchemaResp(resp);
+ storageGroupSchemaResp.convertToRPCStorageGroupSchemaResp(resp);
return resp;
}
@@ -150,11 +150,11 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
public TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
- SchemaPartitionDataSet dataSet =
- (SchemaPartitionDataSet) configManager.getSchemaPartition(patternTree);
+ SchemaPartitionResp schemaResp =
+ (SchemaPartitionResp) configManager.getSchemaPartition(patternTree);
TSchemaPartitionResp resp = new TSchemaPartitionResp();
- dataSet.convertToRpcSchemaPartitionResp(resp);
+ schemaResp.convertToRpcSchemaPartitionResp(resp);
return resp;
}
@@ -163,37 +163,37 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
- SchemaPartitionDataSet dataSet =
- (SchemaPartitionDataSet) configManager.getOrCreateSchemaPartition(patternTree);
+ SchemaPartitionResp dataResp =
+ (SchemaPartitionResp) configManager.getOrCreateSchemaPartition(patternTree);
TSchemaPartitionResp resp = new TSchemaPartitionResp();
- dataSet.convertToRpcSchemaPartitionResp(resp);
+ dataResp.convertToRpcSchemaPartitionResp(resp);
return resp;
}
@Override
public TDataPartitionResp getDataPartition(TDataPartitionReq req) throws TException {
- GetOrCreateDataPartitionPlan getDataPartitionPlan =
- new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
- getDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
- DataPartitionDataSet dataset =
- (DataPartitionDataSet) configManager.getDataPartition(getDataPartitionPlan);
+ GetOrCreateDataPartitionReq getDataPartitionReq =
+ new GetOrCreateDataPartitionReq(ConfigRequestType.GetDataPartition);
+ getDataPartitionReq.convertFromRpcTDataPartitionReq(req);
+ DataPartitionResp dataResp =
+ (DataPartitionResp) configManager.getDataPartition(getDataPartitionReq);
TDataPartitionResp resp = new TDataPartitionResp();
- dataset.convertToRpcDataPartitionResp(resp);
+ dataResp.convertToRpcDataPartitionResp(resp);
return resp;
}
@Override
public TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req) throws TException {
- GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan =
- new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
- getOrCreateDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
- DataPartitionDataSet dataset =
- (DataPartitionDataSet) configManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
+ GetOrCreateDataPartitionReq getOrCreateDataPartitionReq =
+ new GetOrCreateDataPartitionReq(ConfigRequestType.GetOrCreateDataPartition);
+ getOrCreateDataPartitionReq.convertFromRpcTDataPartitionReq(req);
+ DataPartitionResp dataResp =
+ (DataPartitionResp) configManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
TDataPartitionResp resp = new TDataPartitionResp();
- dataset.convertToRpcDataPartitionResp(resp);
+ dataResp.convertToRpcDataPartitionResp(resp);
return resp;
}
@@ -203,12 +203,12 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
|| req.getAuthorType() >= AuthorOperator.AuthorType.values().length) {
throw new IndexOutOfBoundsException("Invalid Author Type ordinal");
}
- AuthorPlan plan = null;
+ AuthorReq plan = null;
try {
plan =
- new AuthorPlan(
- PhysicalPlanType.values()[
- req.getAuthorType() + PhysicalPlanType.AUTHOR.ordinal() + 1],
+ new AuthorReq(
+ ConfigRequestType.values()[
+ req.getAuthorType() + ConfigRequestType.AUTHOR.ordinal() + 1],
req.getUserName(),
req.getRoleName(),
req.getPassword(),
@@ -227,12 +227,12 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
|| req.getAuthorType() >= AuthorOperator.AuthorType.values().length) {
throw new IndexOutOfBoundsException("Invalid Author Type ordinal");
}
- AuthorPlan plan = null;
+ AuthorReq plan = null;
try {
plan =
- new AuthorPlan(
- PhysicalPlanType.values()[
- req.getAuthorType() + PhysicalPlanType.AUTHOR.ordinal() + 1],
+ new AuthorReq(
+ ConfigRequestType.values()[
+ req.getAuthorType() + ConfigRequestType.AUTHOR.ordinal() + 1],
req.getUserName(),
req.getRoleName(),
req.getPassword(),
@@ -242,7 +242,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
} catch (AuthException e) {
LOGGER.error(e.getMessage());
}
- PermissionInfoDataSet dataSet = (PermissionInfoDataSet) configManager.queryPermission(plan);
+ PermissionInfoResp dataSet = (PermissionInfoResp) configManager.queryPermission(plan);
return new TAuthorizerResp(dataSet.getStatus(), dataSet.getPermissionInfo());
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/PhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/ConfigRequestSerDeTest.java
similarity index 71%
rename from confignode/src/test/java/org/apache/iotdb/confignode/physical/PhysicalPlanSerDeTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/physical/ConfigRequestSerDeTest.java
index 78386bee94..79a6cfaafa 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/physical/PhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/ConfigRequestSerDeTest.java
@@ -25,15 +25,17 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
-import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
-import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
-import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.read.QueryDataNodeInfoReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.entity.PrivilegeType;
@@ -52,7 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class PhysicalPlanSerDeTest {
+public class ConfigRequestSerDeTest {
private final ByteBuffer buffer = ByteBuffer.allocate(10240);
@@ -69,26 +71,26 @@ public class PhysicalPlanSerDeTest {
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 7777));
- RegisterDataNodePlan plan0 = new RegisterDataNodePlan(dataNodeLocation);
+ RegisterDataNodeReq plan0 = new RegisterDataNodeReq(dataNodeLocation);
plan0.serialize(buffer);
buffer.flip();
- RegisterDataNodePlan plan1 = (RegisterDataNodePlan) PhysicalPlan.Factory.create(buffer);
+ RegisterDataNodeReq plan1 = (RegisterDataNodeReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@Test
public void QueryDataNodeInfoPlanTest() throws IOException {
- QueryDataNodeInfoPlan plan0 = new QueryDataNodeInfoPlan(-1);
+ QueryDataNodeInfoReq plan0 = new QueryDataNodeInfoReq(-1);
plan0.serialize(buffer);
buffer.flip();
- QueryDataNodeInfoPlan plan1 = (QueryDataNodeInfoPlan) PhysicalPlan.Factory.create(buffer);
+ QueryDataNodeInfoReq plan1 = (QueryDataNodeInfoReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@Test
public void SetStorageGroupPlanTest() throws IOException {
- SetStorageGroupPlan plan0 =
- new SetStorageGroupPlan(
+ SetStorageGroupReq plan0 =
+ new SetStorageGroupReq(
new TStorageGroupSchema()
.setName("sg")
.setTTL(Long.MAX_VALUE)
@@ -99,7 +101,7 @@ public class PhysicalPlanSerDeTest {
.setDataRegionGroupIds(new ArrayList<>()));
plan0.serialize(buffer);
buffer.flip();
- SetStorageGroupPlan plan1 = (SetStorageGroupPlan) PhysicalPlan.Factory.create(buffer);
+ SetStorageGroupReq plan1 = (SetStorageGroupReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@@ -117,7 +119,7 @@ public class PhysicalPlanSerDeTest {
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
- CreateRegionsPlan plan0 = new CreateRegionsPlan();
+ CreateRegionsReq plan0 = new CreateRegionsReq();
plan0.setStorageGroup("sg");
TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
dataRegionSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
@@ -131,7 +133,7 @@ public class PhysicalPlanSerDeTest {
plan0.serialize(buffer);
buffer.flip();
- CreateRegionsPlan plan1 = (CreateRegionsPlan) PhysicalPlan.Factory.create(buffer);
+ CreateRegionsReq plan1 = (CreateRegionsReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@@ -155,12 +157,12 @@ public class PhysicalPlanSerDeTest {
assignedSchemaPartition.put(storageGroup, new HashMap<>());
assignedSchemaPartition.get(storageGroup).put(seriesPartitionSlot, regionReplicaSet);
- CreateSchemaPartitionPlan plan0 = new CreateSchemaPartitionPlan();
+ CreateSchemaPartitionReq plan0 = new CreateSchemaPartitionReq();
plan0.setAssignedSchemaPartition(assignedSchemaPartition);
plan0.serialize(buffer);
buffer.flip();
- CreateSchemaPartitionPlan plan1 =
- (CreateSchemaPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ CreateSchemaPartitionReq plan1 =
+ (CreateSchemaPartitionReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@@ -172,13 +174,13 @@ public class PhysicalPlanSerDeTest {
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
partitionSlotsMap.put(storageGroup, Collections.singletonList(seriesPartitionSlot));
- GetOrCreateSchemaPartitionPlan plan0 =
- new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ GetOrCreateSchemaPartitionReq plan0 =
+ new GetOrCreateSchemaPartitionReq(ConfigRequestType.GetOrCreateSchemaPartition);
plan0.setPartitionSlotsMap(partitionSlotsMap);
plan0.serialize(buffer);
buffer.flip();
- GetOrCreateSchemaPartitionPlan plan1 =
- (GetOrCreateSchemaPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ GetOrCreateSchemaPartitionReq plan1 =
+ (GetOrCreateSchemaPartitionReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@@ -212,11 +214,11 @@ public class PhysicalPlanSerDeTest {
.get(timePartitionSlot)
.add(regionReplicaSet);
- CreateDataPartitionPlan plan0 = new CreateDataPartitionPlan();
+ CreateDataPartitionReq plan0 = new CreateDataPartitionReq();
plan0.setAssignedDataPartition(assignedDataPartition);
plan0.serialize(buffer);
buffer.flip();
- CreateDataPartitionPlan plan1 = (CreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ CreateDataPartitionReq plan1 = (CreateDataPartitionReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@@ -232,66 +234,66 @@ public class PhysicalPlanSerDeTest {
partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
- GetOrCreateDataPartitionPlan plan0 =
- new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+ GetOrCreateDataPartitionReq plan0 =
+ new GetOrCreateDataPartitionReq(ConfigRequestType.GetDataPartition);
plan0.setPartitionSlotsMap(partitionSlotsMap);
plan0.serialize(buffer);
buffer.flip();
- GetOrCreateDataPartitionPlan plan1 =
- (GetOrCreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ GetOrCreateDataPartitionReq plan1 =
+ (GetOrCreateDataPartitionReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
}
@Test
public void AuthorPlanTest() throws IOException, AuthException {
- AuthorPlan plan0 = null;
- AuthorPlan plan1 = null;
+ AuthorReq plan0 = null;
+ AuthorReq plan1 = null;
Set<Integer> permissions = new HashSet<>();
permissions.add(PrivilegeType.GRANT_USER_PRIVILEGE.ordinal());
permissions.add(PrivilegeType.REVOKE_USER_ROLE.ordinal());
// create user
plan0 =
- new AuthorPlan(
- PhysicalPlanType.CREATE_USER, "thulab", "", "passwd", "", new HashSet<>(), "");
+ new AuthorReq(
+ ConfigRequestType.CREATE_USER, "thulab", "", "passwd", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// create role
- plan0 = new AuthorPlan(PhysicalPlanType.CREATE_ROLE, "", "admin", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.CREATE_ROLE, "", "admin", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// alter user
plan0 =
- new AuthorPlan(
- PhysicalPlanType.UPDATE_USER, "tempuser", "", "", "newpwd", new HashSet<>(), "");
+ new AuthorReq(
+ ConfigRequestType.UPDATE_USER, "tempuser", "", "", "newpwd", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// grant user
plan0 =
- new AuthorPlan(PhysicalPlanType.GRANT_USER, "tempuser", "", "", "", permissions, "root.ln");
+ new AuthorReq(ConfigRequestType.GRANT_USER, "tempuser", "", "", "", permissions, "root.ln");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// grant role
plan0 =
- new AuthorPlan(
- PhysicalPlanType.GRANT_ROLE_TO_USER,
+ new AuthorReq(
+ ConfigRequestType.GRANT_ROLE_TO_USER,
"tempuser",
"temprole",
"",
@@ -300,43 +302,43 @@ public class PhysicalPlanSerDeTest {
"root.ln");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// grant role to user
plan0 =
- new AuthorPlan(PhysicalPlanType.GRANT_ROLE, "", "temprole", "", "", new HashSet<>(), "");
+ new AuthorReq(ConfigRequestType.GRANT_ROLE, "", "temprole", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// revoke user
plan0 =
- new AuthorPlan(
- PhysicalPlanType.REVOKE_USER, "tempuser", "", "", "", permissions, "root.ln");
+ new AuthorReq(
+ ConfigRequestType.REVOKE_USER, "tempuser", "", "", "", permissions, "root.ln");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// revoke role
plan0 =
- new AuthorPlan(
- PhysicalPlanType.REVOKE_ROLE, "", "temprole", "", "", permissions, "root.ln");
+ new AuthorReq(
+ ConfigRequestType.REVOKE_ROLE, "", "temprole", "", "", permissions, "root.ln");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// revoke role from user
plan0 =
- new AuthorPlan(
- PhysicalPlanType.REVOKE_ROLE_FROM_USER,
+ new AuthorReq(
+ ConfigRequestType.REVOKE_ROLE_FROM_USER,
"tempuser",
"temprole",
"",
@@ -345,91 +347,91 @@ public class PhysicalPlanSerDeTest {
"");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// drop user
- plan0 = new AuthorPlan(PhysicalPlanType.DROP_USER, "xiaoming", "", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.DROP_USER, "xiaoming", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// drop role
- plan0 = new AuthorPlan(PhysicalPlanType.DROP_ROLE, "", "admin", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.DROP_ROLE, "", "admin", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list user
- plan0 = new AuthorPlan(PhysicalPlanType.LIST_USER, "", "", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.LIST_USER, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list role
- plan0 = new AuthorPlan(PhysicalPlanType.LIST_ROLE, "", "", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.LIST_ROLE, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list privileges user
plan0 =
- new AuthorPlan(PhysicalPlanType.LIST_USER_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
+ new AuthorReq(ConfigRequestType.LIST_USER_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list privileges role
plan0 =
- new AuthorPlan(PhysicalPlanType.LIST_ROLE_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
+ new AuthorReq(ConfigRequestType.LIST_ROLE_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list user privileges
plan0 =
- new AuthorPlan(PhysicalPlanType.LIST_USER_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
+ new AuthorReq(ConfigRequestType.LIST_USER_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list role privileges
plan0 =
- new AuthorPlan(PhysicalPlanType.LIST_ROLE_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
+ new AuthorReq(ConfigRequestType.LIST_ROLE_PRIVILEGE, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list all role of user
- plan0 = new AuthorPlan(PhysicalPlanType.LIST_USER_ROLES, "", "", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.LIST_USER_ROLES, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
// list all user of role
- plan0 = new AuthorPlan(PhysicalPlanType.LIST_ROLE_USERS, "", "", "", "", new HashSet<>(), "");
+ plan0 = new AuthorReq(ConfigRequestType.LIST_ROLE_USERS, "", "", "", "", new HashSet<>(), "");
plan0.serialize(buffer);
buffer.flip();
- plan1 = (AuthorPlan) PhysicalPlan.Factory.create(buffer);
+ plan1 = (AuthorReq) ConfigRequest.Factory.create(buffer);
Assert.assertEquals(plan0, plan1);
cleanBuffer();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
similarity index 98%
rename from confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index e40f601a01..baf700f8c0 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.service.thrift.server;
+package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -27,9 +27,10 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
-import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
-import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.DataNodeInfo;
+import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.StorageGroupInfo;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
@@ -70,22 +71,22 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-public class ConfigNodeRPCServerProcessorTest {
+public class ConfigNodeRPCServiceProcessorTest {
- ConfigNodeRPCServerProcessor processor;
+ ConfigNodeRPCServiceProcessor processor;
@Before
public void before() throws IOException, InterruptedException {
- processor = new ConfigNodeRPCServerProcessor();
+ processor = new ConfigNodeRPCServiceProcessor(new ConfigManager());
// Sleep 1s to make sure the Consensus group has done leader election
TimeUnit.SECONDS.sleep(1);
}
@After
public void after() throws IOException {
- DataNodeInfoPersistence.getInstance().clear();
- PartitionInfoPersistence.getInstance().clear();
- RegionInfoPersistence.getInstance().clear();
+ DataNodeInfo.getInstance().clear();
+ StorageGroupInfo.getInstance().clear();
+ PartitionInfo.getInstance().clear();
processor.close();
FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/JMXService.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/JMXService.java
index c9178852bc..4f0bef1f3e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/JMXService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/JMXService.java
@@ -37,10 +37,8 @@ public class JMXService implements IService {
private static final Logger logger = LoggerFactory.getLogger(JMXService.class);
- private JMXService() {}
-
- public static JMXService getInstance() {
- return JMXServerHolder.INSTANCE;
+ public JMXService() {
+ // Empty constructor
}
/** function for registering MBean. */
@@ -91,11 +89,4 @@ public class JMXService implements IService {
public void stop() {
// do nothing.
}
-
- private static class JMXServerHolder {
-
- private static final JMXService INSTANCE = new JMXService();
-
- private JMXServerHolder() {}
- }
}
diff --git a/procedure/src/main/java/org/apache/iotdb/procedure/service/ProcedureNode.java b/procedure/src/main/java/org/apache/iotdb/procedure/service/ProcedureNode.java
index 7fbdabd611..9627a9c986 100644
--- a/procedure/src/main/java/org/apache/iotdb/procedure/service/ProcedureNode.java
+++ b/procedure/src/main/java/org/apache/iotdb/procedure/service/ProcedureNode.java
@@ -52,7 +52,7 @@ public class ProcedureNode implements ProcedureNodeMBean {
public void setUp() throws StartupException, IOException {
LOGGER.info("Setting up {}...", ProcedureNodeConstant.GLOBAL_NAME);
- registerManager.register(JMXService.getInstance());
+ registerManager.register(new JMXService());
JMXService.registerMBean(getInstance(), mbeanName);
ProcedureServer.getInstance().initSyncedServiceImpl(new ProcedureServerProcessor());
registerManager.register(ProcedureServer.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 2bebd7196e..f281b1f4b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -134,7 +134,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(MetricsService.getInstance());
logger.info("recover the schema...");
initConfigManager();
- registerManager.register(JMXService.getInstance());
+ registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());