You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/25 12:05:19 UTC
[iotdb] branch rename_multileader_iotconsensus updated: rename MultiLeaderConsensus to IoTConsensus
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rename_multileader_iotconsensus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rename_multileader_iotconsensus by this push:
new 38e26b4ff4 rename MultiLeaderConsensus to IoTConsensus
38e26b4ff4 is described below
commit 38e26b4ff4a8609a18ccc7dfb43cb4daff6d7703
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Nov 25 20:05:08 2022 +0800
rename MultiLeaderConsensus to IoTConsensus
---
RELEASE_NOTES.md | 2 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 5 +-
.../manager/load/balancer/RouteBalancer.java | 17 ++--
.../procedure/env/ConfigNodeProcedureEnv.java | 2 +-
.../procedure/env/DataNodeRemoveHandler.java | 10 +-
consensus/pom.xml | 2 +-
.../apache/iotdb/consensus/ConsensusFactory.java | 3 +-
...sensusRequest.java => IoTConsensusRequest.java} | 12 +--
.../iotdb/consensus/config/ConsensusConfig.java | 20 ++--
...tiLeaderConfig.java => IoTConsensusConfig.java} | 8 +-
.../IoTConsensus.java} | 96 +++++++++---------
.../IoTConsensusServerImpl.java} | 107 +++++++++++----------
.../IoTConsensusServerMetrics.java} | 12 +--
.../client/AsyncIoTConsensusServiceClient.java} | 27 +++---
.../client/DispatchLogHandler.java | 8 +-
.../client/IoTConsensusClientPool.java} | 45 +++++----
.../client/SyncIoTConsensusServiceClient.java} | 32 +++---
.../logdispatcher/IndexController.java | 2 +-
.../logdispatcher/IoTConsensusMemoryManager.java} | 14 +--
.../IoTConsensusMemoryManagerMetrics.java} | 14 +--
.../logdispatcher/LogDispatcher.java | 50 +++++-----
.../logdispatcher/LogDispatcherThreadMetrics.java | 2 +-
.../logdispatcher/PendingBatch.java | 10 +-
.../logdispatcher/SyncStatus.java | 18 ++--
.../service/IoTConsensusRPCService.java} | 32 +++---
.../service/IoTConsensusRPCServiceHandler.java} | 8 +-
.../service/IoTConsensusRPCServiceMBean.java} | 4 +-
.../service/IoTConsensusRPCServiceProcessor.java} | 76 +++++++--------
.../snapshot/SnapshotFragment.java | 4 +-
.../snapshot/SnapshotFragmentReader.java | 2 +-
.../wal/ConsensusReqReader.java | 2 +-
.../wal/GetConsensusReqReaderPlan.java | 2 +-
.../IoTConsensusTest.java} | 20 ++--
.../{multileader => iot}/RecoveryTest.java | 8 +-
.../logdispatcher/IndexControllerTest.java | 2 +-
.../logdispatcher/SyncStatusTest.java | 18 ++--
.../util/FakeConsensusReqReader.java | 4 +-
.../{multileader => iot}/util/RequestSets.java | 2 +-
.../{multileader => iot}/util/TestEntry.java | 6 +-
.../util/TestStateMachine.java | 6 +-
docs/UserGuide/Cluster/Cluster-Concept.md | 6 +-
docs/zh/UserGuide/Cluster/Cluster-Concept.md | 6 +-
.../it/IoTDBClusterRegionLeaderBalancingIT.java | 3 +-
.../it/IoTDBConfigNodeSwitchLeaderIT.java | 3 +-
.../resources/conf/iotdb-common.properties | 2 +-
.../iotdb/commons/concurrent/ThreadName.java | 4 +-
.../apache/iotdb/commons/service/ServiceType.java | 2 +-
pom.xml | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../db/consensus/DataRegionConsensusImpl.java | 10 +-
.../consensus/statemachine/BaseStateMachine.java | 4 +-
.../statemachine/DataRegionStateMachine.java | 8 +-
.../apache/iotdb/db/engine/StorageEngineV2.java | 2 +-
.../iotdb/db/engine/storagegroup/DataRegion.java | 4 +-
.../apache/iotdb/db/mpp/plan/TestRPCClient.java | 26 ++---
.../plan/planner/plan/node/write/InsertNode.java | 2 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +-
.../java/org/apache/iotdb/db/wal/WALManager.java | 16 +--
.../db/wal/allocation/FirstCreateStrategy.java | 2 +-
.../org/apache/iotdb/db/wal/node/IWALNode.java | 2 +-
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 8 +-
.../iotdb/db/wal/recover/WALNodeRecoverTask.java | 6 +-
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 2 +-
.../pom.xml | 2 +-
.../src/main/thrift/iotconsensus.thrift | 4 +-
65 files changed, 419 insertions(+), 431 deletions(-)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 420d58c771..6c49710840 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -36,7 +36,7 @@ The 0.14.0-preview2 version only contains the new cluster version.
## Improvements
* Add memory control for query
-* Balance write load of MultiLeader consensus
+* Balance write load of IoT consensus
* Optimize python client query performance
* Optimize c++ client tablet insert performance
* Print detailed info when failed opening session
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index d00f93c301..b70e20bb9c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -82,10 +82,9 @@ public class ConfigNodeStartupCheck {
String.valueOf(1));
}
- // When the schema region consensus protocol is set to MultiLeaderConsensus,
+ // When the schema region consensus protocol is set to IoTConsensus,
// we should report an error
- if (CONF.getSchemaRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
throw new ConfigurationException(
"schema_region_consensus_protocol_class",
String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 6e0a7d95a3..f9638088ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -76,8 +76,8 @@ public class RouteBalancer {
CONF.getSchemaRegionConsensusProtocolClass();
private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS =
CONF.getDataRegionConsensusProtocolClass();
- private static final boolean isMultiLeader =
- ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass());
+ private static final boolean IS_IOT_CONSENSUS =
+ ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass());
private final IManager configManager;
@@ -135,7 +135,7 @@ public class RouteBalancer {
* @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample
*/
public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
- if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && isMultiLeader) {
+ if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && IS_IOT_CONSENSUS) {
// The leadership of multi-leader consensus protocol is decided by ConfigNode-leader
return;
}
@@ -163,8 +163,8 @@ public class RouteBalancer {
AtomicBoolean isLeaderChanged = new AtomicBoolean(false);
leaderCache.forEach(
(regionGroupId, leadershipSample) -> {
- if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && isMultiLeader) {
- // Ignore MultiLeader consensus protocol
+ if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && IS_IOT_CONSENSUS) {
+ // Ignore IoTConsensus consensus protocol
return;
}
@@ -322,8 +322,7 @@ public class RouteBalancer {
}
}
- public void changeLeaderForMultiLeaderConsensus(
- TConsensusGroupId regionGroupId, int newLeaderId) {
+ public void changeLeaderForIoTConsensus(TConsensusGroupId regionGroupId, int newLeaderId) {
regionRouteMap.setLeader(regionGroupId, newLeaderId);
}
@@ -334,7 +333,7 @@ public class RouteBalancer {
TConsensusGroupId regionGroupId,
TDataNodeLocation newLeader) {
switch (consensusProtocolClass) {
- case ConsensusFactory.MULTI_LEADER_CONSENSUS:
+ case ConsensusFactory.IOT_CONSENSUS:
// For multi-leader protocol, change RegionRouteMap is enough.
// And the result will be broadcast by Cluster-LoadStatistics-Service soon.
regionRouteMap.setLeader(regionGroupId, newLeader.getDataNodeId());
@@ -358,7 +357,7 @@ public class RouteBalancer {
public void initRegionRouteMap() {
synchronized (regionRouteMap) {
regionRouteMap.clear();
- if (isMultiLeader) {
+ if (IS_IOT_CONSENSUS) {
// Greedily pick leader for all existed DataRegionGroups
List<TRegionReplicaSet> dataRegionGroups =
getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 2283b35dcc..9aa76d9fec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -548,7 +548,7 @@ public class ConfigNodeProcedureEnv {
// Select leader greedily for multi-leader consensus protocol
if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
- && ConsensusFactory.MULTI_LEADER_CONSENSUS.equals(
+ && ConsensusFactory.IOT_CONSENSUS.equals(
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass())) {
List<Integer> availableDataNodes = new ArrayList<>();
for (Map.Entry<Integer, RegionStatus> statusEntry : regionStatusMap.entrySet()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 877a627ee2..d1aeccaeab 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -51,7 +51,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
-import static org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
public class DataNodeRemoveHandler {
@@ -551,8 +551,8 @@ public class DataNodeRemoveHandler {
/**
* Change the leader of given Region.
*
- * <p>For MULTI_LEADER_CONSENSUS, using `changeLeaderForMultiLeaderConsensus` method to change the
- * regionLeaderMap maintained in ConfigNode.
+ * <p>For IOT_CONSENSUS, using `changeLeaderForIoTConsensus` method to change the regionLeaderMap
+ * maintained in ConfigNode.
*
* <p>For RATIS_CONSENSUS, invoking `changeRegionLeader` DataNode RPC method to change the leader.
*
@@ -568,7 +568,7 @@ public class DataNodeRemoveHandler {
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
- && MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
if (CONF.getDataReplicationFactor() == 1) {
newLeaderNode = Optional.of(migrateDestDataNode);
}
@@ -576,7 +576,7 @@ public class DataNodeRemoveHandler {
configManager
.getLoadManager()
.getRouteBalancer()
- .changeLeaderForMultiLeaderConsensus(regionId, newLeaderNode.get().getDataNodeId());
+ .changeLeaderForIoTConsensus(regionId, newLeaderNode.get().getDataNodeId());
LOGGER.info(
"{}, Change region leader finished for MULTI_LEADER_CONSENSUS, regionId: {}, newLeaderNode: {}",
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 618087fb2c..0839906e72 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -69,7 +69,7 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>thrift-multi-leader-consensus</artifactId>
+ <artifactId>thrift-iot-consensus</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index e9271e868f..c19dbf11a9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -34,8 +34,7 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
- public static final String MULTI_LEADER_CONSENSUS =
- "org.apache.iotdb.consensus.multileader.MultiLeaderConsensus";
+ public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IoTConsensusRequest.java
similarity index 69%
rename from consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/common/request/IoTConsensusRequest.java
index 10c761743d..3b55016d8b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IoTConsensusRequest.java
@@ -22,16 +22,16 @@ package org.apache.iotdb.consensus.common.request;
import java.nio.ByteBuffer;
/**
- * This class is used to represent the sync log request from MultiLeaderConsensus. That we use this
- * class rather than ByteBufferConsensusRequest is because the serialization method is different
- * between these two classes. And we need to separate them in DataRegionStateMachine when
- * deserialize the PlanNode from ByteBuffer
+ * This class is used to represent the sync log request from IoTConsensus. That we use this class
+ * rather than ByteBufferConsensusRequest is because the serialization method is different between
+ * these two classes. And we need to separate them in DataRegionStateMachine when deserialize the
+ * PlanNode from ByteBuffer
*/
-public class MultiLeaderConsensusRequest implements IConsensusRequest {
+public class IoTConsensusRequest implements IConsensusRequest {
private final ByteBuffer byteBuffer;
- public MultiLeaderConsensusRequest(ByteBuffer byteBuffer) {
+ public IoTConsensusRequest(ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index 42527275ea..fb82dae2a9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -27,19 +27,19 @@ public class ConsensusConfig {
private final int thisNodeId;
private final String storageDir;
private final RatisConfig ratisConfig;
- private final MultiLeaderConfig multiLeaderConfig;
+ private final IoTConsensusConfig ioTConsensusConfig;
private ConsensusConfig(
TEndPoint thisNode,
int thisNodeId,
String storageDir,
RatisConfig ratisConfig,
- MultiLeaderConfig multiLeaderConfig) {
+ IoTConsensusConfig ioTConsensusConfig) {
this.thisNodeEndPoint = thisNode;
this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
this.ratisConfig = ratisConfig;
- this.multiLeaderConfig = multiLeaderConfig;
+ this.ioTConsensusConfig = ioTConsensusConfig;
}
public TEndPoint getThisNodeEndPoint() {
@@ -58,8 +58,8 @@ public class ConsensusConfig {
return ratisConfig;
}
- public MultiLeaderConfig getMultiLeaderConfig() {
- return multiLeaderConfig;
+ public IoTConsensusConfig getIoTConsensusConfig() {
+ return ioTConsensusConfig;
}
public static ConsensusConfig.Builder newBuilder() {
@@ -72,7 +72,7 @@ public class ConsensusConfig {
private int thisNodeId;
private String storageDir;
private RatisConfig ratisConfig;
- private MultiLeaderConfig multiLeaderConfig;
+ private IoTConsensusConfig ioTConsensusConfig;
public ConsensusConfig build() {
return new ConsensusConfig(
@@ -80,7 +80,9 @@ public class ConsensusConfig {
thisNodeId,
storageDir,
ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
- multiLeaderConfig != null ? multiLeaderConfig : MultiLeaderConfig.newBuilder().build());
+ ioTConsensusConfig != null
+ ? ioTConsensusConfig
+ : IoTConsensusConfig.newBuilder().build());
}
public Builder setThisNode(TEndPoint thisNode) {
@@ -103,8 +105,8 @@ public class ConsensusConfig {
return this;
}
- public Builder setMultiLeaderConfig(MultiLeaderConfig multiLeaderConfig) {
- this.multiLeaderConfig = multiLeaderConfig;
+ public Builder setIoTConsensusConfig(IoTConsensusConfig ioTConsensusConfig) {
+ this.ioTConsensusConfig = ioTConsensusConfig;
return this;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index af5a340e9b..9b9bc1561c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.consensus.config;
import java.util.concurrent.TimeUnit;
-public class MultiLeaderConfig {
+public class IoTConsensusConfig {
private final RPC rpc;
private final Replication replication;
- private MultiLeaderConfig(RPC rpc, Replication replication) {
+ private IoTConsensusConfig(RPC rpc, Replication replication) {
this.rpc = rpc;
this.replication = replication;
}
@@ -48,8 +48,8 @@ public class MultiLeaderConfig {
private RPC rpc;
private Replication replication;
- public MultiLeaderConfig build() {
- return new MultiLeaderConfig(
+ public IoTConsensusConfig build() {
+ return new IoTConsensusConfig(
rpc != null ? rpc : new RPC.Builder().build(),
replication != null ? replication : new Replication.Builder().build());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
similarity index 78%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 12e601b37b..9986b38bd1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader;
+package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -35,20 +35,20 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.logdispatcher.MultiLeaderMemoryManager;
-import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
-import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory;
+import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
+import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
+import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -66,48 +66,48 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-public class MultiLeaderConsensus implements IConsensus {
+public class IoTConsensus implements IConsensus {
- private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensus.class);
+ private final Logger logger = LoggerFactory.getLogger(IoTConsensus.class);
private final TEndPoint thisNode;
private final int thisNodeId;
private final File storageDir;
private final IStateMachine.Registry registry;
- private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
+ private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
- private final MultiLeaderRPCService service;
+ private final IoTConsensusRPCService service;
private final RegisterManager registerManager = new RegisterManager();
- private final MultiLeaderConfig config;
- private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
- private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
+ private final IoTConsensusConfig config;
+ private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
+ private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
- public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
+ public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
- this.config = config.getMultiLeaderConfig();
+ this.config = config.getIoTConsensusConfig();
this.registry = registry;
- this.service = new MultiLeaderRPCService(thisNode, config.getMultiLeaderConfig());
+ this.service = new IoTConsensusRPCService(thisNode, config.getIoTConsensusConfig());
this.clientManager =
- new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
+ new IClientManager.Factory<TEndPoint, AsyncIoTConsensusServiceClient>()
.createClientManager(
- new AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
+ new AsyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
this.syncClientManager =
- new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+ new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
.createClientManager(
- new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
- // init multiLeader memory manager
- MultiLeaderMemoryManager.getInstance()
+ new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
+ // init IoTConsensus memory manager
+ IoTConsensusMemoryManager.getInstance()
.init(
- config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus(),
- config.getMultiLeaderConfig().getReplication().getAllocateMemoryForQueue());
+ config.getIoTConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
+ config.getIoTConsensusConfig().getReplication().getAllocateMemoryForQueue());
}
@Override
public void start() throws IOException {
initAndRecover();
- service.initAsyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+ service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
try {
registerManager.register(service);
} catch (StartupException e) {
@@ -127,8 +127,8 @@ public class MultiLeaderConsensus implements IConsensus {
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
- MultiLeaderServerImpl consensus =
- new MultiLeaderServerImpl(
+ IoTConsensusServerImpl consensus =
+ new IoTConsensusServerImpl(
path.toString(),
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
@@ -146,13 +146,13 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public void stop() {
clientManager.close();
- stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop);
+ stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
registerManager.deregisterAll();
}
@Override
public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
- MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusWriteResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
@@ -175,7 +175,7 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
- MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusReadResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
@@ -207,8 +207,8 @@ public class MultiLeaderConsensus implements IConsensus {
if (!file.mkdirs()) {
logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
}
- MultiLeaderServerImpl impl =
- new MultiLeaderServerImpl(
+ IoTConsensusServerImpl impl =
+ new IoTConsensusServerImpl(
path,
new Peer(groupId, thisNodeId, thisNode),
peers,
@@ -249,7 +249,7 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
- MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
@@ -257,31 +257,31 @@ public class MultiLeaderConsensus implements IConsensus {
}
try {
// step 1: inactive new Peer to prepare for following steps
- logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer);
+ logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer);
// step 2: notify all the other Peers to build the sync connection to newPeer
- logger.info("[MultiLeaderConsensus] notify current peers to build sync log...");
+ logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);
// step 3: take snapshot
- logger.info("[MultiLeaderConsensus] start to take snapshot...");
+ logger.info("[IoTConsensus] start to take snapshot...");
impl.takeSnapshot();
// step 4: transit snapshot
- logger.info("[MultiLeaderConsensus] start to transit snapshot...");
+ logger.info("[IoTConsensus] start to transit snapshot...");
impl.transitSnapshot(peer);
// step 5: let the new peer load snapshot
- logger.info("[MultiLeaderConsensus] trigger new peer to load snapshot...");
+ logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
// step 6: active new Peer
- logger.info("[MultiLeaderConsensus] activate new peer...");
+ logger.info("[IoTConsensus] activate new peer...");
impl.activePeer(peer);
// step 7: spot clean
- logger.info("[MultiLeaderConsensus] do spot clean...");
+ logger.info("[IoTConsensus] do spot clean...");
doSpotClean(peer, impl);
} catch (ConsensusGroupModifyPeerException e) {
@@ -295,17 +295,17 @@ public class MultiLeaderConsensus implements IConsensus {
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
- private void doSpotClean(Peer peer, MultiLeaderServerImpl impl) {
+ private void doSpotClean(Peer peer, IoTConsensusServerImpl impl) {
try {
impl.cleanupRemoteSnapshot(peer);
} catch (ConsensusGroupModifyPeerException e) {
- logger.warn("[MultiLeaderConsensus] failed to cleanup remote snapshot", e);
+ logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
}
}
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
- MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
@@ -351,7 +351,7 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
- MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
@@ -386,7 +386,7 @@ public class MultiLeaderConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
- public MultiLeaderServerImpl getImpl(ConsensusGroupId groupId) {
+ public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
similarity index 87%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index a1d5c183d5..0256d489c0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader;
+package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -30,30 +30,30 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
-import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
-import org.apache.iotdb.consensus.multileader.snapshot.SnapshotFragmentReader;
-import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
-import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
-import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
-import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
-import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq;
-import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
-import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
-import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
-import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
-import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
-import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
-import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
+import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
+import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
+import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -81,13 +81,13 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class MultiLeaderServerImpl {
+public class IoTConsensusServerImpl {
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
private static final String SNAPSHOT_DIR_NAME = "snapshot";
- private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
private final Peer thisNode;
private final IStateMachine stateMachine;
@@ -97,23 +97,23 @@ public class MultiLeaderServerImpl {
private final List<Peer> configuration;
private final AtomicLong index;
private final LogDispatcher logDispatcher;
- private final MultiLeaderConfig config;
+ private final IoTConsensusConfig config;
private final ConsensusReqReader reader;
private volatile boolean active;
private String latestSnapshotId;
- private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
- private final MultiLeaderServerMetrics metrics;
+ private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
+ private final IoTConsensusServerMetrics metrics;
private final String consensusGroupId;
- public MultiLeaderServerImpl(
+ public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
List<Peer> configuration,
IStateMachine stateMachine,
- IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
- IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager,
- MultiLeaderConfig config) {
+ IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
+ IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
+ IoTConsensusConfig config) {
this.active = true;
this.storageDir = storageDir;
this.thisNode = thisNode;
@@ -135,7 +135,7 @@ public class MultiLeaderServerImpl {
}
this.index = new AtomicLong(currentSearchIndex);
this.consensusGroupId = thisNode.getGroupId().toString();
- this.metrics = new MultiLeaderServerMetrics(this);
+ this.metrics = new IoTConsensusServerMetrics(this);
}
public IStateMachine getStateMachine() {
@@ -308,7 +308,7 @@ public class MultiLeaderServerImpl {
File snapshotDir = new File(storageDir, latestSnapshotId);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
logger.info("transit snapshots: {}", snapshotPaths);
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
@@ -369,7 +369,8 @@ public class MultiLeaderServerImpl {
}
public void inactivePeer(Peer peer) throws ConsensusGroupModifyPeerException {
- try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ try (SyncIoTConsensusServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
TInactivatePeerRes res =
client.inactivatePeer(
new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
@@ -384,7 +385,8 @@ public class MultiLeaderServerImpl {
}
public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
- try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ try (SyncIoTConsensusServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
new TTriggerSnapshotLoadReq(
@@ -400,7 +402,8 @@ public class MultiLeaderServerImpl {
}
public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
- try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
+ try (SyncIoTConsensusServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
TActivatePeerRes res =
client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
if (!isSuccess(res.status)) {
@@ -419,18 +422,18 @@ public class MultiLeaderServerImpl {
// configuration
List<Peer> currentMembers = new ArrayList<>(this.configuration);
logger.info(
- "[MultiLeaderConsensus] notify current peers to build sync log. group member: {}, target: {}",
+ "[IoTConsensus] notify current peers to build sync log. group member: {}, target: {}",
currentMembers,
targetPeer);
for (Peer peer : currentMembers) {
- logger.info("[MultiLeaderConsensus] build sync log channel from {}", peer);
+ logger.info("[IoTConsensus] build sync log channel from {}", peer);
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
// snapshot produced by thisNode
buildSyncLogChannel(targetPeer, index.get());
} else {
// use RPC to tell other peers to build sync log channel to target peer
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TBuildSyncLogChannelRes res =
client.buildSyncLogChannel(
@@ -449,7 +452,7 @@ public class MultiLeaderServerImpl {
// we will skip the peer which cannot be reached.
// If following error message appears, the un-responsible peer should be removed manually
// after current operation
- // TODO: (xingtanzjr) design more reliable way for MultiLeaderConsensus
+ // TODO: (xingtanzjr) design more reliable way for IoTConsensus
logger.error(
"cannot notify {} to build sync log channel. Please check the status of this node manually",
peer,
@@ -473,7 +476,7 @@ public class MultiLeaderServerImpl {
removeSyncLogChannel(targetPeer);
} else {
// use RPC to tell other peers to build sync log channel to target peer
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TRemoveSyncLogChannelRes res =
client.removeSyncLogChannel(
@@ -496,7 +499,7 @@ public class MultiLeaderServerImpl {
public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
long checkIntervalInMs = 10_000L;
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
while (true) {
TWaitSyncLogCompleteRes res =
@@ -545,14 +548,14 @@ public class MultiLeaderServerImpl {
throws ConsensusGroupModifyPeerException {
// step 1, build sync channel in LogDispatcher
logger.info(
- "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}",
+ "[IoTConsensus] build sync log channel to {} with initialSyncIndex {}",
targetPeer,
initialSyncIndex);
logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
// step 2, update configuration
configuration.add(targetPeer);
// step 3, persist configuration
- logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration);
+ logger.info("[IoTConsensus] persist new configuration: {}", configuration);
persistConfigurationUpdate();
}
@@ -560,12 +563,12 @@ public class MultiLeaderServerImpl {
try {
// step 1, remove sync channel in LogDispatcher
logDispatcher.removeLogDispatcherThread(targetPeer);
- logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer);
+ logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer);
// step 2, update configuration
configuration.remove(targetPeer);
// step 3, persist configuration
persistConfigurationUpdate();
- logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
+ logger.info("[IoTConsensus] configuration updated to {}", this.configuration);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e);
}
@@ -579,7 +582,7 @@ public class MultiLeaderServerImpl {
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
publicBAOS.getBuf());
} catch (IOException e) {
- // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't
+ // TODO: (xingtanzjr) need to handle the IOException because the IoTConsensus won't
// work expectedly
// if the exception occurs
logger.error("Unexpected error occurs when persisting configuration", e);
@@ -631,7 +634,7 @@ public class MultiLeaderServerImpl {
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
}
- logger.info("Recover multiLeader, configuration: {}", configuration);
+ logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration);
} catch (IOException e) {
logger.error("Unexpected error occurs when recovering configuration", e);
}
@@ -672,7 +675,7 @@ public class MultiLeaderServerImpl {
return index.get();
}
- public MultiLeaderConfig getConfig() {
+ public IoTConsensusConfig getConfig() {
return config;
}
@@ -711,7 +714,7 @@ public class MultiLeaderServerImpl {
}
public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
TCleanupTransferredSnapshotReq req =
new TCleanupTransferredSnapshotReq(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
similarity index 89%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerMetrics.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index dc03f7edac..d8d15542f9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader;
+package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -27,10 +27,10 @@ import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-public class MultiLeaderServerMetrics implements IMetricSet {
- private final MultiLeaderServerImpl impl;
+public class IoTConsensusServerMetrics implements IMetricSet {
+ private final IoTConsensusServerImpl impl;
- public MultiLeaderServerMetrics(MultiLeaderServerImpl impl) {
+ public IoTConsensusServerMetrics(IoTConsensusServerImpl impl) {
this.impl = impl;
}
@@ -41,7 +41,7 @@ public class MultiLeaderServerMetrics implements IMetricSet {
Metric.MULTI_LEADER.toString(),
MetricLevel.IMPORTANT,
impl,
- MultiLeaderServerImpl::getIndex,
+ IoTConsensusServerImpl::getIndex,
Tag.NAME.toString(),
"multiLeaderServerImpl",
Tag.REGION.toString(),
@@ -53,7 +53,7 @@ public class MultiLeaderServerMetrics implements IMetricSet {
Metric.MULTI_LEADER.toString(),
MetricLevel.IMPORTANT,
impl,
- MultiLeaderServerImpl::getCurrentSafelyDeletedSearchIndex,
+ IoTConsensusServerImpl::getCurrentSafelyDeletedSearchIndex,
Tag.NAME.toString(),
"multiLeaderServerImpl",
Tag.REGION.toString(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
similarity index 79%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index e4a9f4e3c4..1f08dabbc6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -17,13 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.client;
+package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.commons.pool2.PooledObject;
@@ -35,19 +35,20 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AsyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.AsyncClient {
+public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient {
- private static final Logger logger = LoggerFactory.getLogger(AsyncMultiLeaderServiceClient.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
private final TEndPoint endpoint;
- private final ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
+ private final ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
- public AsyncMultiLeaderServiceClient(
+ public AsyncIoTConsensusServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
TEndPoint endpoint,
TAsyncClientManager tClientManager,
- ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager)
+ ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
throws IOException {
super(
protocolFactory,
@@ -108,10 +109,10 @@ public class AsyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
+ extends AsyncBaseClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
public Factory(
- ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
+ ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
ClientFactoryProperty clientFactoryProperty,
String threadName) {
super(clientManager, clientFactoryProperty, threadName);
@@ -119,17 +120,17 @@ public class AsyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.
@Override
public void destroyObject(
- TEndPoint endPoint, PooledObject<AsyncMultiLeaderServiceClient> pooledObject) {
+ TEndPoint endPoint, PooledObject<AsyncIoTConsensusServiceClient> pooledObject) {
pooledObject.getObject().close();
}
@Override
- public PooledObject<AsyncMultiLeaderServiceClient> makeObject(TEndPoint endPoint)
+ public PooledObject<AsyncIoTConsensusServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
- new AsyncMultiLeaderServiceClient(
+ new AsyncIoTConsensusServiceClient(
clientFactoryProperty.getProtocolFactory(),
clientFactoryProperty.getConnectionTimeoutMs(),
endPoint,
@@ -139,7 +140,7 @@ public class AsyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.
@Override
public boolean validateObject(
- TEndPoint endPoint, PooledObject<AsyncMultiLeaderServiceClient> pooledObject) {
+ TEndPoint endPoint, PooledObject<AsyncIoTConsensusServiceClient> pooledObject) {
return pooledObject.getObject() != null && pooledObject.getObject().isReady();
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
similarity index 93%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 468bbc1e27..87b8ac9349 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.client;
+package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
-import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
+import org.apache.iotdb.consensus.iot.logdispatcher.PendingBatch;
+import org.apache.iotdb.consensus.iot.thrift.TSyncLogRes;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
similarity index 61%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 8e2d23af7d..3c7f4a4b8d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -17,35 +17,35 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.client;
+package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-public class MultiLeaderConsensusClientPool {
+public class IoTConsensusClientPool {
- private MultiLeaderConsensusClientPool() {}
+ private IoTConsensusClientPool() {}
- public static class SyncMultiLeaderServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, SyncMultiLeaderServiceClient> {
- private final MultiLeaderConfig config;
+ public static class SyncIoTConsensusServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, SyncIoTConsensusServiceClient> {
+ private final IoTConsensusConfig config;
- public SyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+ public SyncIoTConsensusServiceClientPoolFactory(IoTConsensusConfig config) {
this.config = config;
}
@Override
- public KeyedObjectPool<TEndPoint, SyncMultiLeaderServiceClient> createClientPool(
- ClientManager<TEndPoint, SyncMultiLeaderServiceClient> manager) {
+ public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPool(
+ ClientManager<TEndPoint, SyncIoTConsensusServiceClient> manager) {
return new GenericKeyedObjectPool<>(
- new SyncMultiLeaderServiceClient.Factory(
+ new SyncIoTConsensusServiceClient.Factory(
manager,
new ClientFactoryProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
@@ -53,26 +53,25 @@ public class MultiLeaderConsensusClientPool {
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
.build()),
- new ClientPoolProperty.Builder<SyncMultiLeaderServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>().build().getConfig());
}
}
- public static class AsyncMultiLeaderServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
+ public static class AsyncIoTConsensusServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
- private final MultiLeaderConfig config;
- private static final String MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME =
- "MultiLeaderConsensusClientPool";
+ private final IoTConsensusConfig config;
+ private static final String IOT_CONSENSUS_CLIENT_POOL_THREAD_NAME = "IoTConsensusClientPool";
- public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+ public AsyncIoTConsensusServiceClientPoolFactory(IoTConsensusConfig config) {
this.config = config;
}
@Override
- public KeyedObjectPool<TEndPoint, AsyncMultiLeaderServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> manager) {
+ public KeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> manager) {
return new GenericKeyedObjectPool<>(
- new AsyncMultiLeaderServiceClient.Factory(
+ new AsyncIoTConsensusServiceClient.Factory(
manager,
new ClientFactoryProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
@@ -80,8 +79,8 @@ public class MultiLeaderConsensusClientPool {
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
.build(),
- MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME),
- new ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
+ IOT_CONSENSUS_CLIENT_POOL_THREAD_NAME),
+ new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>().build().getConfig());
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
similarity index 78%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index 3cbb3ae1e3..06fb777810 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/SyncMultiLeaderServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.client;
+package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.BaseClientFactory;
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.sync.SyncThriftClient;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
@@ -40,17 +40,17 @@ import org.apache.thrift.transport.TTransportException;
import java.lang.reflect.Constructor;
import java.net.SocketException;
-public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.Client
+public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
implements SyncThriftClient, AutoCloseable {
private final TEndPoint endPoint;
- private final ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager;
+ private final ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager;
- public SyncMultiLeaderServiceClient(
+ public SyncIoTConsensusServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
TEndPoint endPoint,
- ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager)
+ ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager)
throws TTransportException {
super(
protocolFactory.getProtocol(
@@ -71,7 +71,7 @@ public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.C
}
@TestOnly
- public ClientManager<TEndPoint, SyncMultiLeaderServiceClient> getClientManager() {
+ public ClientManager<TEndPoint, SyncIoTConsensusServiceClient> getClientManager() {
return clientManager;
}
@@ -101,32 +101,32 @@ public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.C
@Override
public String toString() {
- return String.format("SyncMultiLeaderServiceClient{%s}", endPoint);
+ return String.format("SyncIoTConsensusServiceClient{%s}", endPoint);
}
- public static class Factory extends BaseClientFactory<TEndPoint, SyncMultiLeaderServiceClient> {
+ public static class Factory extends BaseClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
public Factory(
- ClientManager<TEndPoint, SyncMultiLeaderServiceClient> clientManager,
+ ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager,
ClientFactoryProperty clientFactoryProperty) {
super(clientManager, clientFactoryProperty);
}
@Override
public void destroyObject(
- TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+ TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
pooledObject.getObject().invalidate();
}
@Override
- public PooledObject<SyncMultiLeaderServiceClient> makeObject(TEndPoint endpoint)
+ public PooledObject<SyncIoTConsensusServiceClient> makeObject(TEndPoint endpoint)
throws Exception {
- Constructor<SyncMultiLeaderServiceClient> constructor =
- SyncMultiLeaderServiceClient.class.getConstructor(
+ Constructor<SyncIoTConsensusServiceClient> constructor =
+ SyncIoTConsensusServiceClient.class.getConstructor(
TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
- SyncMultiLeaderServiceClient.class,
+ SyncIoTConsensusServiceClient.class,
constructor,
clientFactoryProperty.getProtocolFactory(),
clientFactoryProperty.getConnectionTimeoutMs(),
@@ -136,7 +136,7 @@ public class SyncMultiLeaderServiceClient extends MultiLeaderConsensusIService.C
@Override
public boolean validateObject(
- TEndPoint endpoint, PooledObject<SyncMultiLeaderServiceClient> pooledObject) {
+ TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
return pooledObject.getObject() != null
&& pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index cd52802e4a..e5242c6611 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.utils.TestOnly;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
similarity index 85%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 9da0e7ad4c..dde4753b2a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -27,14 +27,14 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class MultiLeaderMemoryManager {
- private static final Logger logger = LoggerFactory.getLogger(MultiLeaderMemoryManager.class);
+public class IoTConsensusMemoryManager {
+ private static final Logger logger = LoggerFactory.getLogger(IoTConsensusMemoryManager.class);
private final AtomicLong memorySizeInByte = new AtomicLong(0);
private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
private Long maxMemorySizeForQueueInByte = Runtime.getRuntime().maxMemory() / 100 * 6;
- private MultiLeaderMemoryManager() {
- MetricService.getInstance().addMetricSet(new MultiLeaderMemoryManagerMetrics(this));
+ private IoTConsensusMemoryManager() {
+ MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this));
}
public boolean reserve(long size, boolean fromQueue) {
@@ -82,9 +82,9 @@ public class MultiLeaderMemoryManager {
return memorySizeInByte.get();
}
- private static final MultiLeaderMemoryManager INSTANCE = new MultiLeaderMemoryManager();
+ private static final IoTConsensusMemoryManager INSTANCE = new IoTConsensusMemoryManager();
- public static MultiLeaderMemoryManager getInstance() {
+ public static IoTConsensusMemoryManager getInstance() {
return INSTANCE;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManagerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerMetrics.java
similarity index 78%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManagerMetrics.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerMetrics.java
index 047335d6d4..59c71f6017 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManagerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -26,11 +26,11 @@ import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-public class MultiLeaderMemoryManagerMetrics implements IMetricSet {
- private final MultiLeaderMemoryManager multiLeaderMemoryManager;
+public class IoTConsensusMemoryManagerMetrics implements IMetricSet {
+ private final IoTConsensusMemoryManager iotConsensusMemoryManager;
- public MultiLeaderMemoryManagerMetrics(MultiLeaderMemoryManager multiLeaderMemoryManager) {
- this.multiLeaderMemoryManager = multiLeaderMemoryManager;
+ public IoTConsensusMemoryManagerMetrics(IoTConsensusMemoryManager iotConsensusMemoryManager) {
+ this.iotConsensusMemoryManager = iotConsensusMemoryManager;
}
@Override
@@ -38,8 +38,8 @@ public class MultiLeaderMemoryManagerMetrics implements IMetricSet {
metricService.createAutoGauge(
Metric.MEM.toString(),
MetricLevel.IMPORTANT,
- multiLeaderMemoryManager,
- MultiLeaderMemoryManager::getMemorySizeInByte,
+ iotConsensusMemoryManager,
+ IoTConsensusMemoryManager::getMemorySizeInByte,
Tag.NAME.toString(),
"MultiLeaderConsensus");
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
similarity index 91%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c5afe13c14..886d1e1818 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
@@ -27,14 +27,14 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
-import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
+import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
+import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -59,17 +59,17 @@ public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
- private final MultiLeaderServerImpl impl;
+ private final IoTConsensusServerImpl impl;
private final List<LogDispatcherThread> threads;
private final String selfPeerId;
- private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
+ private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
private ExecutorService executorService;
private boolean stopped = false;
public LogDispatcher(
- MultiLeaderServerImpl impl,
- IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
+ IoTConsensusServerImpl impl,
+ IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) {
this.impl = impl;
this.selfPeerId = impl.getThisNode().getEndpoint().toString();
this.clientManager = clientManager;
@@ -177,7 +177,7 @@ public class LogDispatcher {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
private static final long START_INDEX = 1;
- private final MultiLeaderConfig config;
+ private final IoTConsensusConfig config;
private final Peer peer;
private final IndexController controller;
// A sliding window class that manages asynchronously pendingBatches
@@ -189,15 +189,15 @@ public class LogDispatcher {
// A reader management class that gets requests from the DataRegion
private final ConsensusReqReader reader =
(ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
- private final MultiLeaderMemoryManager multiLeaderMemoryManager =
- MultiLeaderMemoryManager.getInstance();
+ private final IoTConsensusMemoryManager iotConsensusMemoryManager =
+ IoTConsensusMemoryManager.getInstance();
private volatile boolean stopped = false;
private final ConsensusReqReader.ReqIterator walEntryIterator;
private final LogDispatcherThreadMetrics metrics;
- public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long initialSyncIndex) {
+ public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
this.peer = peer;
this.config = config;
this.pendingRequest = new LinkedBlockingQueue<>();
@@ -224,7 +224,7 @@ public class LogDispatcher {
return peer;
}
- public MultiLeaderConfig getConfig() {
+ public IoTConsensusConfig getConfig() {
return config;
}
@@ -238,7 +238,7 @@ public class LogDispatcher {
/** try to offer a request into queue with memory control */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
- if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
+ if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
return false;
}
boolean success;
@@ -246,19 +246,19 @@ public class LogDispatcher {
success = pendingRequest.offer(indexedConsensusRequest);
} catch (Throwable t) {
// If exception occurs during request offer, the reserved memory should be released
- multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize());
throw t;
}
if (!success) {
// If offer failed, the reserved memory should be released
- multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize());
}
return success;
}
/** try to remove a request from queue with memory control */
private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
- multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize());
}
public void stop() {
@@ -268,12 +268,12 @@ public class LogDispatcher {
requestSize += indexedConsensusRequest.getSerializedSize();
}
pendingRequest.clear();
- multiLeaderMemoryManager.free(requestSize);
+ iotConsensusMemoryManager.free(requestSize);
requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) {
requestSize += indexedConsensusRequest.getSerializedSize();
}
- multiLeaderMemoryManager.free(requestSize);
+ iotConsensusMemoryManager.free(requestSize);
syncStatus.free();
MetricService.getInstance().removeMetricSet(metrics);
}
@@ -441,7 +441,7 @@ public class LogDispatcher {
public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) {
try {
- AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
+ AsyncIoTConsensusServiceClient client = clientManager.borrowClient(peer.getEndpoint());
TSyncLogReq req =
new TSyncLogReq(
selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcherThreadMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcherThreadMetrics.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
index c8a9412cc9..cbf1f1203a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcherThreadMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/PendingBatch.java
similarity index 90%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/PendingBatch.java
index dbfb52d515..73d1b9117c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/PendingBatch.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
import java.nio.Buffer;
import java.util.ArrayList;
@@ -28,7 +28,7 @@ import java.util.List;
public class PendingBatch {
- private final MultiLeaderConfig config;
+ private final IoTConsensusConfig config;
private long startIndex;
private long endIndex;
@@ -39,7 +39,7 @@ public class PendingBatch {
// indicates whether this batch has been successfully synchronized to another node
private boolean synced;
- public PendingBatch(MultiLeaderConfig config) {
+ public PendingBatch(IoTConsensusConfig config) {
this.config = config;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
similarity index 84%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 25af8b8fcd..bacf3f47d2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import java.util.Iterator;
import java.util.LinkedList;
@@ -28,13 +28,13 @@ import java.util.List;
public class SyncStatus {
- private final MultiLeaderConfig config;
+ private final IoTConsensusConfig config;
private final IndexController controller;
private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
- private final MultiLeaderMemoryManager multiLeaderMemoryManager =
- MultiLeaderMemoryManager.getInstance();
+ private final IoTConsensusMemoryManager iotConsensusMemoryManager =
+ IoTConsensusMemoryManager.getInstance();
- public SyncStatus(IndexController controller, MultiLeaderConfig config) {
+ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
this.controller = controller;
this.config = config;
}
@@ -43,7 +43,7 @@ public class SyncStatus {
public void addNextBatch(PendingBatch batch) throws InterruptedException {
synchronized (this) {
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()
- || !multiLeaderMemoryManager.reserve(batch.getSerializedSize(), false)) {
+ || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) {
wait();
}
pendingBatches.add(batch);
@@ -64,7 +64,7 @@ public class SyncStatus {
while (current.isSynced()) {
controller.updateAndGet(current.getEndIndex());
iterator.remove();
- multiLeaderMemoryManager.free(current.getSerializedSize());
+ iotConsensusMemoryManager.free(current.getSerializedSize());
if (iterator.hasNext()) {
current = iterator.next();
} else {
@@ -83,7 +83,7 @@ public class SyncStatus {
size += pendingBatch.getSerializedSize();
}
pendingBatches.clear();
- multiLeaderMemoryManager.free(size);
+ iotConsensusMemoryManager.free(size);
}
/** Gets the first index that is not currently synchronized */
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
similarity index 71%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index d7b41f7818..6777873a5d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.service;
+package org.apache.iotdb.consensus.iot.service;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -26,44 +26,44 @@ import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.thrift.TBaseAsyncProcessor;
import java.lang.reflect.InvocationTargetException;
-public class MultiLeaderRPCService extends ThriftService implements MultiLeaderRPCServiceMBean {
+public class IoTConsensusRPCService extends ThriftService implements IoTConsensusRPCServiceMBean {
private final TEndPoint thisNode;
- private final MultiLeaderConfig config;
- private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
+ private final IoTConsensusConfig config;
+ private IoTConsensusRPCServiceProcessor iotConsensusRPCServiceProcessor;
- public MultiLeaderRPCService(TEndPoint thisNode, MultiLeaderConfig config) {
+ public IoTConsensusRPCService(TEndPoint thisNode, IoTConsensusConfig config) {
this.thisNode = thisNode;
this.config = config;
}
@Override
public ServiceType getID() {
- return ServiceType.MULTI_LEADER_CONSENSUS_SERVICE;
+ return ServiceType.IOT_CONSENSUS_SERVICE;
}
@Override
- public void initAsyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
- this.multiLeaderRPCServiceProcessor =
- (MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
+ public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
+ this.iotConsensusRPCServiceProcessor =
+ (IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
super.mbeanName =
String.format(
"%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, getID().getJmxName());
- super.initAsyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+ super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
}
@Override
public void initTProcessor()
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
NoSuchMethodException, InvocationTargetException {
- processor = new MultiLeaderConsensusIService.AsyncProcessor<>(multiLeaderRPCServiceProcessor);
+ processor = new IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
}
@Override
@@ -74,14 +74,14 @@ public class MultiLeaderRPCService extends ThriftService implements MultiLeaderR
new ThriftServiceThread(
(TBaseAsyncProcessor) processor,
getID().getName(),
- ThreadName.MULTI_LEADER_CONSENSUS_RPC_PROCESSOR.getName(),
+ ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpc().getRpcSelectorThreadNum(),
config.getRpc().getRpcMinConcurrentClientNum(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
- new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
+ new IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
config.getRpc().getConnectionTimeoutInMs(),
config.getRpc().getThriftMaxFrameSize(),
@@ -89,7 +89,7 @@ public class MultiLeaderRPCService extends ThriftService implements MultiLeaderR
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.MULTI_LEADER_CONSENSUS_RPC_SERVICE.getName());
+ thriftServiceThread.setName(ThreadName.IOT_CONSENSUS_RPC_SERVICE.getName());
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceHandler.java
similarity index 84%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceHandler.java
index 889ac7d517..621f887de3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceHandler.java
@@ -17,18 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.service;
+package org.apache.iotdb.consensus.iot.service;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
-public class MultiLeaderRPCServiceHandler implements TServerEventHandler {
+public class IoTConsensusRPCServiceHandler implements TServerEventHandler {
- private final MultiLeaderRPCServiceProcessor processor;
+ private final IoTConsensusRPCServiceProcessor processor;
- public MultiLeaderRPCServiceHandler(MultiLeaderRPCServiceProcessor processor) {
+ public IoTConsensusRPCServiceHandler(IoTConsensusRPCServiceProcessor processor) {
this.processor = processor;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceMBean.java
similarity index 88%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceMBean.java
index 9e354ac204..953df2ed3f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceMBean.java
@@ -17,6 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.service;
+package org.apache.iotdb.consensus.iot.service;
-public interface MultiLeaderRPCServiceMBean {}
+public interface IoTConsensusRPCServiceMBean {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
similarity index 81%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 249b4fe744..be2df4ae91 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -17,37 +17,37 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.service;
+package org.apache.iotdb.consensus.iot.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
-import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
-import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
-import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
-import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
-import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
-import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
-import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq;
-import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
-import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
-import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelRes;
-import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
-import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
-import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
-import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
-import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
+import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
+import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelRes;
+import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
+import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
+import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
+import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelRes;
+import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
+import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
+import org.apache.iotdb.consensus.iot.thrift.TSyncLogRes;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -58,13 +58,13 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.stream.Collectors;
-public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
+public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.AsyncIface {
- private final Logger logger = LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
- private final MultiLeaderConsensus consensus;
+ private final IoTConsensus consensus;
- public MultiLeaderRPCServiceProcessor(MultiLeaderConsensus consensus) {
+ public IoTConsensusRPCServiceProcessor(IoTConsensus consensus) {
this.consensus = consensus;
}
@@ -73,7 +73,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
try {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format(
@@ -109,7 +109,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
batch.getData().stream()
.map(
batch.isFromWAL()
- ? MultiLeaderConsensusRequest::new
+ ? IoTConsensusRequest::new
: ByteBufferConsensusRequest::new)
.collect(Collectors.toList())));
}
@@ -128,7 +128,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
@@ -148,7 +148,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
@@ -169,7 +169,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
@@ -196,7 +196,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
@@ -223,7 +223,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for waitSyncLogComplete request", groupId);
@@ -245,7 +245,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
@@ -272,7 +272,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
@@ -294,7 +294,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
similarity index 93%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
index ea042966ad..a249bad90f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragment.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
@@ -17,9 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.snapshot;
+package org.apache.iotdb.consensus.iot.snapshot;
-import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
+import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
import java.nio.ByteBuffer;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 1509995aff..4bb4d173b9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/snapshot/SnapshotFragmentReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.snapshot;
+package org.apache.iotdb.consensus.iot.snapshot;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
index 48a13c8df7..81ce0666a5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.wal;
+package org.apache.iotdb.consensus.iot.wal;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/GetConsensusReqReaderPlan.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/GetConsensusReqReaderPlan.java
similarity index 95%
rename from consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/GetConsensusReqReaderPlan.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/GetConsensusReqReaderPlan.java
index 303f2eeb8f..fe4ddbfd0f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/GetConsensusReqReaderPlan.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/GetConsensusReqReaderPlan.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.wal;
+package org.apache.iotdb.consensus.iot.wal;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/IoTConsensusTest.java
similarity index 94%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/IoTConsensusTest.java
index 53bb885d84..6f21adc5b6 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/IoTConsensusTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader;
+package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -26,8 +26,8 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.multileader.util.TestEntry;
-import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
+import org.apache.iotdb.consensus.iot.util.TestEntry;
+import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -43,9 +43,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-public class MultiLeaderConsensusTest {
+public class IoTConsensusTest {
private static final long CHECK_POINT_GAP = 500;
- private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensusTest.class);
+ private final Logger logger = LoggerFactory.getLogger(IoTConsensusTest.class);
private final ConsensusGroupId gid = new DataRegionId(1);
@@ -62,7 +62,7 @@ public class MultiLeaderConsensusTest {
new File("target" + java.io.File.separator + "3"));
private final ConsensusGroup group = new ConsensusGroup(gid, peers);
- private final List<MultiLeaderConsensus> servers = new ArrayList<>();
+ private final List<IoTConsensus> servers = new ArrayList<>();
private final List<TestStateMachine> stateMachines = new ArrayList<>();
@Before
@@ -86,9 +86,9 @@ public class MultiLeaderConsensusTest {
for (int i = 0; i < 3; i++) {
int finalI = i;
servers.add(
- (MultiLeaderConsensus)
+ (IoTConsensus)
ConsensusFactory.getConsensusImpl(
- ConsensusFactory.MULTI_LEADER_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
ConsensusConfig.newBuilder()
.setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
@@ -100,13 +100,13 @@ public class MultiLeaderConsensusTest {
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
- ConsensusFactory.MULTI_LEADER_CONSENSUS))));
+ ConsensusFactory.IOT_CONSENSUS))));
servers.get(i).start();
}
}
private void stopServer() {
- servers.parallelStream().forEach(MultiLeaderConsensus::stop);
+ servers.parallelStream().forEach(IoTConsensus::stop);
servers.clear();
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/RecoveryTest.java
similarity index 92%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/RecoveryTest.java
index 5600089c05..d391faa17f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/RecoveryTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader;
+package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -27,7 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
+import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -47,7 +47,7 @@ public class RecoveryTest {
public void constructConsensus() throws IOException {
consensusImpl =
ConsensusFactory.getConsensusImpl(
- ConsensusFactory.MULTI_LEADER_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
ConsensusConfig.newBuilder()
.setThisNodeId(1)
.setThisNode(new TEndPoint("0.0.0.0", 9000))
@@ -59,7 +59,7 @@ public class RecoveryTest {
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
- ConsensusFactory.MULTI_LEADER_CONSENSUS)));
+ ConsensusFactory.IOT_CONSENSUS)));
consensusImpl.start();
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
similarity index 98%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 761ace4230..183c35e2ee 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
similarity index 92%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index bec9c6c955..b0a85612f7 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.logdispatcher;
+package org.apache.iotdb.consensus.iot.logdispatcher;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -39,7 +39,7 @@ public class SyncStatusTest {
private static final File storageDir = new File("target" + java.io.File.separator + "test");
private static final String prefix = "version";
- private static final MultiLeaderConfig config = new MultiLeaderConfig.Builder().build();
+ private static final IoTConsensusConfig config = new IoTConsensusConfig.Builder().build();
private static final long CHECK_POINT_GAP = 500;
@Before
@@ -65,7 +65,7 @@ public class SyncStatusTest {
for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
TLogBatch logBatch = new TLogBatch();
logBatch.setSearchIndex(i);
- PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+ PendingBatch batch = new PendingBatch(IoTConsensusConfig.newBuilder().build());
batch.addTLogBatch(logBatch);
batch.buildIndex();
batchList.add(batch);
@@ -96,7 +96,7 @@ public class SyncStatusTest {
for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
TLogBatch logBatch = new TLogBatch();
logBatch.setSearchIndex(i);
- PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+ PendingBatch batch = new PendingBatch(IoTConsensusConfig.newBuilder().build());
batch.addTLogBatch(logBatch);
batch.buildIndex();
batchList.add(batch);
@@ -133,7 +133,7 @@ public class SyncStatusTest {
for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
TLogBatch logBatch = new TLogBatch();
logBatch.setSearchIndex(i);
- PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+ PendingBatch batch = new PendingBatch(IoTConsensusConfig.newBuilder().build());
batch.addTLogBatch(logBatch);
batch.buildIndex();
batchList.add(batch);
@@ -181,7 +181,7 @@ public class SyncStatusTest {
for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
TLogBatch logBatch = new TLogBatch();
logBatch.setSearchIndex(i);
- PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+ PendingBatch batch = new PendingBatch(IoTConsensusConfig.newBuilder().build());
batch.addTLogBatch(logBatch);
batch.buildIndex();
batchList.add(batch);
@@ -202,7 +202,7 @@ public class SyncStatusTest {
() -> {
TLogBatch logBatch = new TLogBatch();
logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch());
- PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+ PendingBatch batch = new PendingBatch(IoTConsensusConfig.newBuilder().build());
batch.addTLogBatch(logBatch);
batch.buildIndex();
batchList.add(batch);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
similarity index 95%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index d258d77b3b..c2560201a5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.util;
+package org.apache.iotdb.consensus.iot.util;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/RequestSets.java
similarity index 96%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/util/RequestSets.java
index 5b16573280..afaa710fb8 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/RequestSets.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.util;
+package org.apache.iotdb.consensus.iot.util;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestEntry.java
similarity index 91%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestEntry.java
index 874aa9b7f1..473244aaa2 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestEntry.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.util;
+package org.apache.iotdb.consensus.iot.util;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -28,7 +28,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class TestEntry extends MultiLeaderConsensusRequest {
+public class TestEntry extends IoTConsensusRequest {
private final int num;
private final Peer peer;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
similarity index 95%
rename from consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
index 648c6282dd..672200daa3 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.multileader.util;
+package org.apache.iotdb.consensus.iot.util;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
@@ -26,8 +26,8 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/docs/UserGuide/Cluster/Cluster-Concept.md b/docs/UserGuide/Cluster/Cluster-Concept.md
index b54766df57..a04fe15d27 100644
--- a/docs/UserGuide/Cluster/Cluster-Concept.md
+++ b/docs/UserGuide/Cluster/Cluster-Concept.md
@@ -107,9 +107,9 @@ The figure contains 3 DataRegion groups, and the data_replication_factor is 3, s
Among multiple replicas of each region group, data consistency is guaranteed through a consensus protocol, which routes read and write requests to multiple replicas.
* Current supported consensus protocol
- * Standalone:Could only be used when replica is 1, which is the empty implementation of the consensus protocol.
- * MultiLeader:Could be used in any number of replicas, only for DataRegion, writings can be applied on each replica and replicated asynchronously to other replicas.
- * Ratis:Raft consensus protocol, Could be used in any number of replicas, and could be used for any region groups。
+ * SimpleConsensus:Could only be used when replica is 1, which is the empty implementation of the consensus protocol.
+ * IoTConsensus :Could be used in any number of replicas, only for DataRegion, writings can be applied on each replica and replicated asynchronously to other replicas.
+ * RatisConsensus:Raft consensus protocol, Could be used in any number of replicas, and could be used for any region groups。
## 0.14.0-preview1 Function Map
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Concept.md b/docs/zh/UserGuide/Cluster/Cluster-Concept.md
index 81c0db45ee..f550f85641 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Concept.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Concept.md
@@ -108,9 +108,9 @@ Region 是数据复制的基本单位,一个 Region 的多个副本构成了
每个副本组的多个副本之间,都通过一个具体的共识协议保证数据一致性,共识协议会将读写请求应用到多个副本上。
* 现有的共识协议
- * Standalone:仅单副本时可用,一致性协议的空实现,效率最高。
- * MultiLeader:任意副本数可用,当前仅可用于 DataRegion 的副本上,写入可以在任一副本进行,并异步复制到其他副本。
- * Ratis:Raft 协议的一种实现,任意副本数可用,当前可用于任意副本组上。
+ * SimpleConsensus:仅单副本时可用,一致性协议的空实现,效率最高。
+ * IoTConsensus:任意副本数可用,当前仅可用于 DataRegion 的副本上,写入可以在任一副本进行,并异步复制到其他副本。
+ * RatisConsensus:Raft 协议的一种实现,任意副本数可用,当前可用于任意副本组上。
## 0.14.0-Preview1 功能图
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
index ae463159f1..ac02520d10 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
@@ -69,8 +69,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
private static final String testSchemaRegionConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
protected static String originalDataRegionConsensusProtocolClass;
- private static final String testDataRegionConsensusProtocolClass =
- ConsensusFactory.MULTI_LEADER_CONSENSUS;
+ private static final String testDataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS;
protected static int originalSchemaReplicationFactor;
protected static int originalDataReplicationFactor;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
index a3a94ced3b..746fe7cc1c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
@@ -81,8 +81,7 @@ public class IoTDBConfigNodeSwitchLeaderIT {
ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
ConfigFactory.getConfig()
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
- ConfigFactory.getConfig()
- .setDataRegionConsensusProtocolClass(ConsensusFactory.MULTI_LEADER_CONSENSUS);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 21138fcbc2..466e3ae065 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -49,7 +49,7 @@
# This parameter is unmodifiable after ConfigNode starts for the first time.
# These consensus protocols are currently supported:
# 1. org.apache.iotdb.consensus.simple.SimpleConsensus
-# 2. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus
+# 2. org.apache.iotdb.consensus.iot.IoTConsensus
# 3. org.apache.iotdb.consensus.ratis.RatisConsensus
# Datatype: string
# data_region_consensus_protocol_class=org.apache.iotdb.consensus.simple.SimpleConsensus
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a6a1119e4b..89c64f4bc9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -23,8 +23,8 @@ public enum ThreadName {
CLIENT_RPC_PROCESSOR("ClientRPC-Processor"),
CONFIGNODE_RPC_SERVICE("ConfigNodeRPC-Service"),
CONFIGNODE_RPC_PROCESSOR("ConfigNodeRPC-Processor"),
- MULTI_LEADER_CONSENSUS_RPC_SERVICE("MultiLeaderConsensusRPC-Service"),
- MULTI_LEADER_CONSENSUS_RPC_PROCESSOR("MultiLeaderConsensusRPC-Processor"),
+ IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
+ IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
MPP_DATA_EXCHANGE_RPC_SERVICE("MPPDataExchangeRPC-Service"),
MPP_DATA_EXCHANGE_RPC_PROCESSOR("MPPDataExchangeRPC-Processor"),
DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 1734f1da47..36ac71c842 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,7 +72,7 @@ public enum ServiceType {
FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
MPP_DATA_EXCHANGE_SERVICE("MPP Data exchange manager", "MPPDataExchangeManager"),
INTERNAL_SERVICE("Internal Service", "InternalService"),
- MULTI_LEADER_CONSENSUS_SERVICE("Multi Leader consensus Service", "MultiLeaderRPCService");
+ IOT_CONSENSUS_SERVICE("Multi Leader consensus Service", "IoTConsensusRPCService");
private final String name;
private final String jmxName;
diff --git a/pom.xml b/pom.xml
index 08f69ddb7c..0cf39c3e53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
<module>thrift</module>
<module>thrift-commons</module>
<module>thrift-confignode</module>
- <module>thrift-multi-leader-consensus</module>
+ <module>thrift-iot-consensus</module>
<module>thrift-influxdb</module>
<module>service-rpc</module>
<module>jdbc</module>
@@ -832,7 +832,7 @@
<sourceDirectory>thrift/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-commons/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-confignode/target/generated-sources/thrift</sourceDirectory>
- <sourceDirectory>thrift-multi-leader-consensus/target/generated-sources/thrift</sourceDirectory>
+ <sourceDirectory>thrift-iot-consensus/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-sync/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-cluster/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-influxdb/target/generated-sources/thrift</sourceDirectory>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 2233dbfd07..cf31d52c24 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1010,10 +1010,10 @@ public class IoTDBConfig {
/** Maximum execution time of a DriverTask */
private int driverTaskExecutionTimeSliceInMs = 100;
- /** Maximum size of wal buffer used in MultiLeader consensus. Unit: byte */
+ /** Maximum size of wal buffer used in IoTConsensus. Unit: byte */
private long throttleThreshold = 50 * 1024 * 1024 * 1024L;
- /** Maximum wait time of write cache in MultiLeader consensus. Unit: ms */
+ /** Maximum wait time of write cache in IoTConsensus. Unit: ms */
private long cacheWindowTimeInMs = 60 * 1000;
private long dataRatisConsensusLogAppenderBufferSizeMax = 4 * 1024 * 1024L;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 719052a9fb..b87e2cb968 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig.RPC;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig.RPC;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -65,8 +65,8 @@ public class DataRegionConsensusImpl {
new TEndPoint(
conf.getInternalAddress(), conf.getDataRegionConsensusPort()))
.setStorageDir(conf.getDataRegionConsensusDir())
- .setMultiLeaderConfig(
- MultiLeaderConfig.newBuilder()
+ .setIoTConsensusConfig(
+ IoTConsensusConfig.newBuilder()
.setRpc(
RPC.newBuilder()
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
@@ -84,7 +84,7 @@ public class DataRegionConsensusImpl {
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
.build())
.setReplication(
- MultiLeaderConfig.Replication.newBuilder()
+ IoTConsensusConfig.Replication.newBuilder()
.setWalThrottleThreshold(conf.getThrottleThreshold())
.setAllocateMemoryForConsensus(
conf.getAllocateMemoryForConsensus())
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 6eb7006d8a..f36429e702 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -53,7 +53,7 @@ public abstract class BaseStateMachine
PlanNode node;
if (request instanceof ByteBufferConsensusRequest) {
node = PlanNodeType.deserialize(request.serializeToByteBuffer());
- } else if (request instanceof MultiLeaderConsensusRequest) {
+ } else if (request instanceof IoTConsensusRequest) {
node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
} else if (request instanceof PlanNode) {
node = (PlanNode) request;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index d605a2fd06..bf602a7d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -152,7 +152,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
/**
- * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+ * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
* in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
@@ -169,8 +169,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
/**
- * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
- * in follower the same as the leader. And besides order insurance, we can make the
+ * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
+ * order in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 1889a1e326..17be5981cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -651,7 +651,7 @@ public class StorageEngineV2 implements IService {
if (config.isClusterMode()
&& config
.getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ .equals(ConsensusFactory.IOT_CONSENSUS)) {
WALManager.getInstance()
.deleteWALNode(
region.getStorageGroupName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 576e3b22cc..3b616a2421 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3312,9 +3312,7 @@ public class DataRegion {
/** This method could only be used in multi-leader consensus */
public IWALNode getWALNode() {
- if (!config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
throw new UnsupportedOperationException();
}
// identifier should be same with getTsFileProcessor method
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 4a5b4169fe..72bced5f55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -26,13 +26,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool;
-import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
-import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
-import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool;
+import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
+import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
+import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -50,14 +50,14 @@ public class TestRPCClient {
.createClientManager(
new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
- private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
+ private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
public TestRPCClient() {
syncClientManager =
- new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
+ new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
.createClientManager(
- new MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory(
- new MultiLeaderConfig.Builder().build()));
+ new IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory(
+ new IoTConsensusConfig.Builder().build()));
}
public static void main(String args[]) {
@@ -68,7 +68,7 @@ public class TestRPCClient {
}
private void loadSnapshot() {
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
@@ -81,7 +81,7 @@ public class TestRPCClient {
}
private void testAddPeer() {
- try (SyncMultiLeaderServiceClient client =
+ try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
TInactivatePeerRes res =
client.inactivatePeer(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index eb2e52cf28..340f3d9c95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index b1929e1fd9..ce010a7939 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -257,10 +257,10 @@ public class DataNode implements DataNodeMBean {
dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass());
}
- // In current implementation, only MultiLeader need separated memory from Consensus
+ // In current implementation, only IoTConsensus need separated memory from Consensus
if (!config
.getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ .equals(ConsensusFactory.IOT_CONSENSUS)) {
IoTDBDescriptor.getInstance().reclaimConsensusMemory();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index 0c64406932..f96dfc34a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -62,9 +62,7 @@ public class WALManager implements IService {
private WALManager() {
if (config.isClusterMode()
- && config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
walNodesManager = new FirstCreateStrategy();
} else if (config.getMaxWalNodesNum() == 0) {
walNodesManager = new ElasticStrategy();
@@ -74,9 +72,7 @@ public class WALManager implements IService {
}
public static String getApplicantUniqueId(String storageGroupName, boolean sequence) {
- return config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)
+ return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
? storageGroupName
: storageGroupName
+ IoTDBConstant.FILE_NAME_SEPARATOR
@@ -97,9 +93,7 @@ public class WALManager implements IService {
String applicantUniqueId, String logDirectory, long startFileVersion, long startSearchIndex) {
if (config.getWalMode() == WALMode.DISABLE
|| !config.isClusterMode()
- || !config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
return;
}
@@ -111,9 +105,7 @@ public class WALManager implements IService {
public void deleteWALNode(String applicantUniqueId) {
if (config.getWalMode() == WALMode.DISABLE
|| !config.isClusterMode()
- || !config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 2d7d3858b1..e3476b4e8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.db.wal.node.IWALNode;
import org.apache.iotdb.db.wal.node.WALNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
index c0995ad584..e700789b17 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.wal.node;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 95a15f6375..19b3294f20 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -519,7 +519,7 @@ public class WALNode implements IWALNode {
long currentIndex = buffer.getLong();
buffer.clear();
if (currentIndex == targetIndex) {
- tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+ tmpNodes.add(new IoTConsensusRequest(buffer));
} else { // different search index, all slices found
if (!tmpNodes.isEmpty()) {
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
@@ -527,7 +527,7 @@ public class WALNode implements IWALNode {
}
// remember to add current plan node
if (currentIndex > targetIndex) {
- tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+ tmpNodes.add(new IoTConsensusRequest(buffer));
targetIndex = currentIndex;
}
}
@@ -588,7 +588,7 @@ public class WALNode implements IWALNode {
long currentIndex = buffer.getLong();
buffer.clear();
if (currentIndex == targetIndex) {
- tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+ tmpNodes.add(new IoTConsensusRequest(buffer));
} else { // find all slices of plan node
insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index bd73f4413c..0a8240a075 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -48,7 +48,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+import static org.apache.iotdb.consensus.iot.wal.ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/** This task is responsible for the recovery of one wal node. */
public class WALNodeRecoverTask implements Runnable {
@@ -94,9 +94,7 @@ public class WALNodeRecoverTask implements Runnable {
}
}
- if (!config
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MULTI_LEADER_CONSENSUS)) {
+ if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
// delete this wal node folder
FileUtils.deleteDirectory(logDirectory);
logger.info(
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index c19d64ed67..517b7aeaf1 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
diff --git a/thrift-multi-leader-consensus/pom.xml b/thrift-iot-consensus/pom.xml
similarity index 97%
rename from thrift-multi-leader-consensus/pom.xml
rename to thrift-iot-consensus/pom.xml
index a55147ded5..9e0de37856 100644
--- a/thrift-multi-leader-consensus/pom.xml
+++ b/thrift-iot-consensus/pom.xml
@@ -27,7 +27,7 @@
<version>0.14.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>thrift-multi-leader-consensus</artifactId>
+ <artifactId>thrift-iot-consensus</artifactId>
<name>rpc-thrift-multi-leader-consensus</name>
<description>Rpc modules for multi leader consensus algorithm</description>
<dependencies>
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
similarity index 97%
rename from thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
rename to thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
index 28e9047178..a21d5d6d9d 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
@@ -18,7 +18,7 @@
*/
include "common.thrift"
-namespace java org.apache.iotdb.consensus.multileader.thrift
+namespace java org.apache.iotdb.consensus.iot.thrift
struct TLogBatch {
1: required list<binary> data
@@ -113,7 +113,7 @@ struct TCleanupTransferredSnapshotRes {
1: required common.TSStatus status
}
-service MultiLeaderConsensusIService {
+service IoTConsensusIService {
TSyncLogRes syncLog(TSyncLogReq req)
TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
TActivatePeerRes activatePeer(TActivatePeerReq req)