You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/05 10:38:53 UTC
[iotdb] 01/01: finish
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch datanode_consensus_init
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a4204fd360866526b164f2844a6555a80e631255
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 5 18:38:26 2022 +0800
finish
---
.../resources/conf/iotdb-confignode.properties | 21 ++-
.../iotdb/confignode/conf/ConfigNodeConf.java | 28 ++--
.../confignode/conf/ConfigNodeDescriptor.java | 39 +++---
.../iotdb/confignode/manager/ConsensusManager.java | 49 ++-----
consensus/README.md | 126 +++++++++++------
consensus/pom.xml | 2 +-
.../apache/iotdb/consensus/IConsensusFactory.java | 57 ++++++++
.../iotdb/consensus/ratis/RatisConsensus.java | 152 ++++++++-------------
.../consensus/standalone/StandAloneConsensus.java | 20 ++-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 20 ++-
.../standalone/StandAloneConsensusTest.java | 33 +++--
.../org/apache/iotdb/commons/cluster/Endpoint.java | 10 ++
.../apache/iotdb/commons/utils/CommonUtils.java | 10 +-
.../resources/conf/iotdb-engine.properties | 19 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 +++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../iotdb/db/consensus/ConsensusExample.java | 30 ++--
.../apache/iotdb/db/consensus/ConsensusImpl.java | 66 +++++++++
.../statemachine/DataRegionStateMachine.java | 4 +-
.../statemachine/SchemaRegionStateMachine.java | 4 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 16 +++
.../java/org/apache/iotdb/db/service/DataNode.java | 14 +-
22 files changed, 505 insertions(+), 261 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 801bf17b44..3aaf61eddc 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -57,18 +57,25 @@ config_node_rpc_port=22277
### consensus protocol configuration
####################
-# ConfigNodeGroup consensus protocol type
-# These consensus protocol are currently supported:
-# 1. standalone(No protocol, only supports stand-alone machine)
-# 2. ratis(Raft protocol)
+# ConfigNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
# Datatype: String
-# consensus_type=standalone
+# config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
# Used for building the ConfigNode consensus group
# all config node address and internal port, use comma to distinguish
# every node should have the same config_node_address_lists
# Datatype: String
-# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+# config_node_group_address_list=0.0.0.0:22278
+
+# DataNode consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
####################
### DeviceGroup Configuration
@@ -79,7 +86,6 @@ config_node_rpc_port=22277
# device_group_count=10000
# DeviceGroup hash algorithm
-# Datatype: String
# These hashing algorithms are currently supported:
# 1. org.apache.iotdb.commons.hash.BKDRHashExecutor(Default)
# 2. org.apache.iotdb.commons.hash.APHashExecutor
@@ -87,6 +93,7 @@ config_node_rpc_port=22277
# 4. org.apache.iotdb.commons.hash.SDBMHashExecutor
# Also, if you want to implement your own hash algorithm, you can inherit the DeviceGroupHashExecutor class and
# modify this parameter to correspond to your Java class
+# Datatype: String
# device_group_hash_executor_class=org.apache.iotdb.commons.hash.BKDRHashExecutor
####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index c94ebe05a6..f21bb768d8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.consensus.common.ConsensusType;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
@@ -33,14 +32,17 @@ public class ConfigNodeConf {
/** used for communication between data node and config node */
private int rpcPort = 22277;
- /** used for communication between data node and data node */
+ /** used for communication between config node and config node */
private int internalPort = 22278;
- /** ConfigNodeGroup consensus protocol */
- private ConsensusType consensusType = ConsensusType.STANDALONE;
+ /** ConfigNode consensus protocol */
+ private String configNodeConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
/** Used for building the ConfigNode consensus group */
- private Endpoint[] configNodeGroupAddressList = null;
+ private Endpoint[] configNodeGroupAddressList;
/** Number of DeviceGroups per StorageGroup */
private int deviceGroupCount = 10000;
@@ -199,12 +201,20 @@ public class ConfigNodeConf {
this.consensusDir = consensusDir;
}
- public ConsensusType getConsensusType() {
- return consensusType;
+ public String getConfigNodeConsensusProtocolClass() {
+ return configNodeConsensusProtocolClass;
+ }
+
+ public void setConfigNodeConsensusProtocolClass(String configNodeConsensusProtocolClass) {
+ this.configNodeConsensusProtocolClass = configNodeConsensusProtocolClass;
+ }
+
+ public String getDataNodeConsensusProtocolClass() {
+ return dataNodeConsensusProtocolClass;
}
- public void setConsensusType(ConsensusType consensusType) {
- this.consensusType = consensusType;
+ public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
+ this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
}
public Endpoint[] getConfigNodeGroupAddressList() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index f6f035d987..4f2590192d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.consensus.common.ConsensusType;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.utils.CommonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,9 +123,13 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"config_node_internal_port", String.valueOf(conf.getInternalPort()))));
- conf.setConsensusType(
- ConsensusType.getConsensusType(
- properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
+ conf.setConfigNodeConsensusProtocolClass(
+ properties.getProperty(
+ "config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass()));
+
+ conf.setDataNodeConsensusProtocolClass(
+ properties.getProperty(
+ "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
conf.setRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
@@ -175,25 +180,15 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_region_count", String.valueOf(conf.getDataRegionCount()))));
- String addresses = properties.getProperty("config_node_group_address_list", null);
- if (addresses != null) {
- String[] addressList = addresses.split(",");
- Endpoint[] endpointList = new Endpoint[addressList.length];
- for (int i = 0; i < addressList.length; i++) {
- String[] ipPort = addressList[i].split(":");
- if (ipPort.length != 2) {
- throw new IOException(
- String.format(
- "Parsing parameter config_node_group_address_list error. "
- + "The %d-th address must format to ip:port, but currently is %s",
- i, addressList[i]));
- }
- endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
- }
- conf.setConfigNodeGroupAddressList(endpointList);
- }
+ String addresses = properties.getProperty("config_node_group_address_list", "0.0.0.0:22278");
- } catch (IOException e) {
+ String[] addressList = addresses.split(",");
+ Endpoint[] endpointList = new Endpoint[addressList.length];
+ for (int i = 0; i < addressList.length; i++) {
+ endpointList[i] = CommonUtils.parseNodeUrl(addressList[i]);
+ }
+ conf.setConfigNodeGroupAddressList(endpointList);
+ } catch (IOException | BadNodeUrlException e) {
LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
} finally {
conf.updatePath();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3261c6dde0..d48c264790 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -27,11 +27,10 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.ratis.RatisConsensus;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
/** ConsensusManager maintains consensus class, request will redirect to consensus layer */
@@ -90,42 +88,19 @@ public class ConsensusManager {
// There is only one ConfigNodeGroup
consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
- // Implement specific consensus
- switch (conf.getConsensusType()) {
- case STANDALONE:
- constructStandAloneConsensus();
- break;
- case RATIS:
- constructRatisConsensus();
- break;
- default:
- throw new IllegalArgumentException(
- "Start ConfigNode failed, unrecognized ConsensusType: "
- + conf.getConsensusType().getTypeName());
- }
- }
-
- private void constructStandAloneConsensus() throws IOException {
- // Standalone consensus
- consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
- consensusImpl.start();
-
- // Standalone ConsensusGroup
- consensusImpl.addConsensusGroup(
- consensusGroupId,
- Collections.singletonList(
- new Peer(
- consensusGroupId, new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))));
- }
-
- private void constructRatisConsensus() throws IOException {
// Ratis consensus local implement
consensusImpl =
- RatisConsensus.newBuilder()
- .setEndpoint(new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))
- .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
- .setStorageDir(new File(conf.getConsensusDir()))
- .build();
+ IConsensusFactory.getConsensusImpl(
+ conf.getConfigNodeConsensusProtocolClass(),
+ new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+ new File(conf.getConsensusDir()),
+ gid -> new PartitionRegionStateMachine())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ IConsensusFactory.CONSTRUCT_FAILED_MSG,
+ conf.getConfigNodeConsensusProtocolClass())));
consensusImpl.start();
// Build ratis group from user properties
diff --git a/consensus/README.md b/consensus/README.md
index 7fc51e0d8d..4c731a6322 100644
--- a/consensus/README.md
+++ b/consensus/README.md
@@ -20,104 +20,144 @@
-->
# Overview
+
The Consensus Package provides the Consensus Layer's service definitions and its implementations.
-
+
### What is Consensus Layer used for?
-Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and data-integrity.
-There are variety of consensus algorithms to manage multiple copies of data, which differs in levels of data consistency and performance.
-Each consensus algorithm may have multiple industrial implementations.
-The Consensus Layer aims to hide all complications behind different consensus algorithms and implementations, providing higher level of abstraction to the user.
+
+Generally, We maintain multiple copies of application data for the purpose of fault-tolerance and
+data-integrity. There are variety of consensus algorithms to manage multiple copies of data, which
+differs in levels of data consistency and performance. Each consensus algorithm may have multiple
+industrial implementations. The Consensus Layer aims to hide all complications behind different
+consensus algorithms and implementations, providing higher level of abstraction to the user.
# Consensus Layer
+
### Basic Concepts
* `IStateMachine` is the user application that manages a local copy of data.
-* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine` internally.
+* `Peer` is the smallest consensus unit (inside a process), which holds a `IStateMachine`
+ internally.
* `ConsensusGroup` is a group of `Peer` all managing the same copy of data.
-* `IConsensus` interface defines the basic functionality provided by Consensus Layer.
+* `IConsensus` interface defines the basic functionality provided by Consensus Layer.
+* `IConsessusFactory` is the only factory class exposed to other modules to create a consensus layer
+ implementation.
-User application can create a `ConsensusGroup` with k `Peer` to store data,
-i.e. that there will be k copies of data.
+User application can create a `ConsensusGroup` with k `Peer` to store data, i.e. that there will be
+k copies of data.
-When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group leader's `IStateMachine::write` .
-The leader makes decision about this write operation first,
-then applies write-operation to the local statemachine using `IStateMachine::write`,
-and forward this operation to other members' `IStateMachine::write` in the same group
+When write data into a `ConsensusGroup` using `IConsensus::write`, it will be sent to the group
+leader's `IStateMachine::write` . The leader makes decision about this write operation first, then
+applies write-operation to the local statemachine using `IStateMachine::write`, and forward this
+operation to other members' `IStateMachine::write` in the same group
### How to use
+
1. Define the `IStateMachine` to manage local copy of data.
2. Define the `IConsensusRequest` to customize request format
3. Define the `DataSet` to customize the response format
-4. Pick a concrete `IConsensus` implementation as the backbone consensus algorithm
-
-
+4. Select a specific "IConsensus" class name and call `IConsensusFactory.getConsensusImpl()` to
+ instantiate the corresponding consensus protocol
# Ratis Consensus Implementation
-RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based on [Apache Ratis](https://ratis.apache.org/).
+RatisConsensus is a multi-raft implementation of `IConsensus` protocol. It is based
+on [Apache Ratis](https://ratis.apache.org/).
+
+### 1. build and start a RatisConsensusImpl
-### 1. build and start a RatisConsensusService
```java
-IConsensus consensusService = RatisConsensus.newBuilder()
- .setEndpoint(peers.get(i).getEndpoint())
- .setStateMachineRegistry(groupId -> new IntegerCounter())
- .setStorageDir(peersStorage.get(i))
- .build();
-consensusService.start();
+IConsensus consensusImpl =
+ IConsensusFactory.getConsensusImpl(
+ "org.apache.iotdb.consensus.ratis.RatisConsensus",
+ new Endpoint(conf.getRpcAddress(), conf.getInternalPort()),
+ new File(conf.getConsensusDir()),
+ gid -> new PartitionRegionStateMachine())
+ .orElseThrow(() ->
+ new IllegalArgumentException(
+ String.format(
+ IConsensusFactory.CONSTRUCT_FAILED_MSG,
+ "org.apache.iotdb.consensus.ratis.RatisConsensus")));
+
+consensusImpl.start();
```
-* `endpoint` is the communication endpoint for this consensus service.
-* `StateMachineRegistery`
-* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the RatisConsensus knows where to recover when crashes and restarts.
+
+* `endpoint` is the communication endpoint for this consensusImpl.
+* `StateMachineRegistery`
+* `StoreageDir` specifies the location to store RaftLog. Assign a fix location so that the
+ RatisConsensus knows where to recover when crashes and restarts.
### 2. assign local RatisConsensus a new Group
+
```java
ConsensusGroup group = new ConsensusGroup(...);
-response = service.addConsensusGroup(group.getGroupId(), group.getPeers());
+response = consensusImpl.addConsensusGroup(group.getGroupId(),group.getPeers());
```
-The underling Raft Service will initialize its states, and reaching out to other peers to elect the raft leader.
+
+The underling consensusImpl will initialize its states, and reaching out to other peers to elect the
+raft leader.
**Notice**: this request may fail. It's caller's responsibility to retry / rollback.
### 3. change group configuration
+
#### (1) remove a member
+
suppose now the group contains peer[0,1,2], and we want to remove[1,2] from this group
+
```java
// the following code should be called in peer both 1 & 2
// first use removePeer to inform the group leader of configuration change
-service.removePeer(gid, myself);
+consensusImpl.removePeer(gid,myself);
// then use removeConsensusGroup to clean up local states and data
-service.removeConsensusGroup(gid);
+consensusImpl.removeConsensusGroup(gid);
```
-**Notice**: either of `removePeer` or `removeConsensusGroup` may fail.
-It's caller's responsibility to retry and make these two calls atomic.
+
+**Notice**: either of `removePeer` or `removeConsensusGroup` may fail. It's caller's responsibility
+to retry and make these two calls atomic.
+
#### (2) add a member
-adding a new member is similar to removing a member except that you should call `addConsensusGroup` first and then `addPeer`
+
+adding a new member is similar to removing a member except that you should call `addConsensusGroup`
+first and then `addPeer`
+
```java
// the following code should be called in peer both 1 & 2
// first addConsensusGroup to initialize local states
-service.addConsensusGroup(gid);
+consensusImpl.addConsensusGroup(gid);
// then use addPeer to inform the previous group members of joining a new member
-service.addPeer(gid, myself);
+consensusImpl.addPeer(gid,myself);
```
+
#### (3) add/remove multiple members
+
```java
// pre. For each member newly added, call addConsensusGroup locally to initialize
-serivice.changePeer(group.getGroupId(), newGroupmember);
+consensusImpl.changePeer(group.getGroupId(),newGroupmember);
// after. For each member removed, call removeConsensusGroup locally to clean up
```
+
**Notice**: the old group and the new group must overlap in at least one member.
+
### 4. write data
+
```java
-ConsensusWriteResponse response = service.write(gid, request)
-if (response.isSuccess() && response.getStates().code() == 200) {...}
+ConsensusWriteResponse response = consensusImpl.write(gid,request)
+if(response.isSuccess() && response.getStates().code() == 200){
+ ...
+}
```
+
### 5. read data
+
```java
-ConsensusReadResponse response = consensus.read(gid, request);
-if (response.isSuccess()) {
- MyDataSet result = (MyDataSet) response.getDataset();
+ConsensusReadResponse response = consensusImpl.read(gid,request);
+if(response.isSuccess()){
+ MyDataSet result=(MyDataSet)response.getDataset();
}
```
-**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may be stale and not linearizable!
+
+**NOTICE**: currently in RatisConsensus, read will direct read the local copy. Thus, the result may
+be stale and not linearizable!
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 8f6646eae6..f668d8b4bd 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -54,7 +54,7 @@
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
new file mode 100644
index 0000000000..9f36de4396
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensusFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+public class IConsensusFactory {
+ public static final String CONSTRUCT_FAILED_MSG =
+ "Construct consensusImpl failed, Please check your consensus className %s";
+
+ private static final Logger logger = LoggerFactory.getLogger(IConsensusFactory.class);
+
+ public static Optional<IConsensus> getConsensusImpl(
+ String className, Endpoint endpoint, File storageDir, IStateMachine.Registry registry) {
+ try {
+ Class<?> executor = Class.forName(className);
+ Constructor<?> executorConstructor =
+ executor.getDeclaredConstructor(Endpoint.class, File.class, IStateMachine.Registry.class);
+ executorConstructor.setAccessible(true);
+ return Optional.of(
+ (IConsensus) executorConstructor.newInstance(endpoint, storageDir, registry));
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ logger.error("Couldn't Construct IConsensus class: {}", className, e);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 8e70f9653a..68094643cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
*
* <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
*/
-public class RatisConsensus implements IConsensus {
+class RatisConsensus implements IConsensus {
// the unique net communication endpoint
private final RaftPeer myself;
@@ -89,41 +89,40 @@ public class RatisConsensus implements IConsensus {
private static final int LEADER_PRIORITY = 1;
private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
- /**
- * This function will use the previous client for groupId to query the latest group info It will
- * update the new group info into the groupMap and rebuild its client
- *
- * @throws ConsensusGroupNotExistException when cannot get the group info
- */
- private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
- throws ConsensusGroupNotExistException {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient current = clientMap.get(raftGroupId);
- try {
- GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
- if (!reply.isSuccess()) {
- throw new ConsensusGroupNotExistException(groupId);
- }
+ public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+ throws IOException {
- raftGroupMap.put(raftGroupId, reply.getGroup());
- buildClientAndCache(raftGroupMap.get(raftGroupId));
- } catch (IOException e) {
- throw new ConsensusGroupNotExistException(groupId);
- }
- }
+ this.clientMap = new ConcurrentHashMap<>();
+ this.raftGroupMap = new ConcurrentHashMap<>();
+ this.localFakeId = ClientId.randomId();
+ this.localFakeCallId = new AtomicLong(0);
- private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
- throws RatisRequestFailedException {
- RaftClient client = clientMap.get(raftGroupId);
- // notify the group leader of configuration change
- RaftClientReply reply = null;
- try {
- reply = client.admin().setConfiguration(peers);
- } catch (IOException e) {
- throw new RatisRequestFailedException(e);
+ // create a RaftPeer as endpoint of comm
+ String address = Utils.IPAddress(endpoint);
+ myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+
+ RaftProperties properties = new RaftProperties();
+
+ // set the storage directory (different for each peer) in RaftProperty object
+ if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
+ ratisStorageDir = new File("./" + myself.getId().toString());
}
- return reply;
+ RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
+
+ // set the port which server listen to in RaftProperty object
+ final int port = NetUtils.createSocketAddr(address).getPort();
+ GrpcConfigKeys.Server.setPort(properties, port);
+
+ server =
+ RaftServer.newBuilder()
+ .setServerId(myself.getId())
+ .setProperties(properties)
+ .setStateMachineRegistry(
+ raftGroupId ->
+ new ApplicationStateMachineProxy(
+ registry.apply(Utils.toConsensusGroupId(raftGroupId))))
+ .build();
}
@Override
@@ -517,71 +516,40 @@ public class RatisConsensus implements IConsensus {
}
}
- private RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
- throws IOException {
-
- this.clientMap = new ConcurrentHashMap<>();
- this.raftGroupMap = new ConcurrentHashMap<>();
- this.localFakeId = ClientId.randomId();
- this.localFakeCallId = new AtomicLong(0);
-
- // create a RaftPeer as endpoint of comm
- String address = Utils.IPAddress(endpoint);
- myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+ /**
+ * This function will use the previous client for groupId to query the latest group info It will
+ * update the new group info into the groupMap and rebuild its client
+ *
+ * @throws ConsensusGroupNotExistException when cannot get the group info
+ */
+ private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+ throws ConsensusGroupNotExistException {
+ RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftClient current = clientMap.get(raftGroupId);
+ try {
+ GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
- RaftProperties properties = new RaftProperties();
+ if (!reply.isSuccess()) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
- // set the storage directory (different for each peer) in RaftProperty object
- if (ratisStorageDir == null || !ratisStorageDir.isDirectory()) {
- ratisStorageDir = new File("./" + myself.getId().toString());
+ raftGroupMap.put(raftGroupId, reply.getGroup());
+ buildClientAndCache(raftGroupMap.get(raftGroupId));
+ } catch (IOException e) {
+ throw new ConsensusGroupNotExistException(groupId);
}
- RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
-
- // set the port which server listen to in RaftProperty object
- final int port = NetUtils.createSocketAddr(address).getPort();
- GrpcConfigKeys.Server.setPort(properties, port);
-
- server =
- RaftServer.newBuilder()
- .setServerId(myself.getId())
- .setProperties(properties)
- .setStateMachineRegistry(
- raftGroupId ->
- new ApplicationStateMachineProxy(
- registry.apply(Utils.toConsensusGroupId(raftGroupId))))
- .build();
- }
-
- public static Builder newBuilder() {
- return new Builder();
}
- public static class Builder {
- private Endpoint endpoint;
- private IStateMachine.Registry registry;
- private File storageDir;
-
- public Builder() {
- storageDir = null;
- }
-
- public Builder setEndpoint(Endpoint endpoint) {
- this.endpoint = endpoint;
- return this;
- }
-
- public Builder setStateMachineRegistry(IStateMachine.Registry registry) {
- this.registry = registry;
- return this;
- }
-
- public Builder setStorageDir(File dir) {
- this.storageDir = dir;
- return this;
- }
-
- public RatisConsensus build() throws IOException {
- return new RatisConsensus(endpoint, storageDir, registry);
+ private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+ throws RatisRequestFailedException {
+ RaftClient client = clientMap.get(raftGroupId);
+ // notify the group leader of configuration change
+ RaftClientReply reply;
+ try {
+ reply = client.admin().setConfiguration(peers);
+ } catch (IOException e) {
+ throw new RatisRequestFailedException(e);
}
+ return reply;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 6f1582a268..91a767055b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.standalone;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
@@ -31,8 +32,10 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -41,19 +44,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * A simple single replica consensus implementation.
+ * A simple consensus implementation, which can be used when replicaNum is 1.
+ *
+ * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
*
* <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
* EmptyStateMachine());` to perform an initialization implementation.
*/
-public class StandAloneConsensus implements IConsensus {
+class StandAloneConsensus implements IConsensus {
+ private final Endpoint thisNode;
+ private final File storageDir;
private final IStateMachine.Registry registry;
- private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap;
+ private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
+ new ConcurrentHashMap<>();
- public StandAloneConsensus(IStateMachine.Registry registry) {
+ public StandAloneConsensus(Endpoint thisNode, File storageDir, Registry registry) {
+ this.thisNode = thisNode;
+ this.storageDir = storageDir;
this.registry = registry;
- this.stateMachineMap = new ConcurrentHashMap<>();
}
@Override
@@ -136,6 +145,7 @@ public class StandAloneConsensus implements IConsensus {
v.stop();
return null;
});
+
if (!exist.get()) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 2558ad2579..581fdd55c5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
@@ -52,6 +53,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class RatisConsensusTest {
+ private static final String RATIS_CLASS_NAME = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
private static class TestDataSet implements DataSet {
private int number;
@@ -65,7 +68,7 @@ public class RatisConsensusTest {
}
private static class TestRequest {
- private int cmd;
+ private final int cmd;
public TestRequest(ByteBuffer buffer) {
cmd = buffer.getInt();
@@ -136,11 +139,16 @@ public class RatisConsensusTest {
servers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
servers.add(
- RatisConsensus.newBuilder()
- .setEndpoint(peers.get(i).getEndpoint())
- .setStateMachineRegistry(groupId -> new IntegerCounter())
- .setStorageDir(peersStorage.get(i))
- .build());
+ IConsensusFactory.getConsensusImpl(
+ RATIS_CLASS_NAME,
+ peers.get(i).getEndpoint(),
+ peersStorage.get(i),
+ groupId -> new IntegerCounter())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ IConsensusFactory.CONSTRUCT_FAILED_MSG, RATIS_CLASS_NAME))));
servers.get(i).start();
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index ded008960d..ddb86f817a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -40,6 +41,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -52,6 +54,8 @@ import static org.junit.Assert.assertTrue;
public class StandAloneConsensusTest {
+ private static final String STANDALONE_CONSENSUS_CLASS_NAME =
+ "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
private IConsensus consensusImpl;
private final TestEntry entry1 = new TestEntry(0);
private final ByteBufferConsensusRequest entry2 =
@@ -108,16 +112,25 @@ public class StandAloneConsensusTest {
@Before
public void setUp() throws Exception {
consensusImpl =
- new StandAloneConsensus(
- gid -> {
- switch (gid.getType()) {
- case SchemaRegion:
- return new TestStateMachine(true);
- case DataRegion:
- return new TestStateMachine(false);
- }
- return new EmptyStateMachine();
- });
+ IConsensusFactory.getConsensusImpl(
+ STANDALONE_CONSENSUS_CLASS_NAME,
+ new Endpoint("localhost", 6667),
+ new File("./"),
+ gid -> {
+ switch (gid.getType()) {
+ case SchemaRegion:
+ return new TestStateMachine(true);
+ case DataRegion:
+ return new TestStateMachine(false);
+ }
+ return new EmptyStateMachine();
+ })
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ IConsensusFactory.CONSTRUCT_FAILED_MSG,
+ STANDALONE_CONSENSUS_CLASS_NAME)));
consensusImpl.start();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
index ff6e8b118b..25d7b84d0f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/Endpoint.java
@@ -39,10 +39,20 @@ public class Endpoint {
return ip;
}
+ public Endpoint setIp(String ip) {
+ this.ip = ip;
+ return this;
+ }
+
public int getPort() {
return port;
}
+ public Endpoint setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
public void serializeImpl(ByteBuffer buffer) {
byte[] bytes = ip.getBytes();
buffer.putInt(bytes.length);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index 3d9995edbc..e9e170232c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +31,8 @@ import java.util.List;
public class CommonUtils {
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
- public static EndPoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
- EndPoint result = new EndPoint();
+ public static Endpoint parseNodeUrl(String nodeUrl) throws BadNodeUrlException {
+ Endpoint result = new Endpoint();
String[] split = nodeUrl.split(":");
if (split.length != 2) {
logger.warn("Bad node url: {}", nodeUrl);
@@ -49,8 +49,8 @@ public class CommonUtils {
return result;
}
- public static List<EndPoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
- List<EndPoint> result = new ArrayList<>();
+ public static List<Endpoint> parseNodeUrls(List<String> nodeUrls) throws BadNodeUrlException {
+ List<Endpoint> result = new ArrayList<>();
for (String url : nodeUrls) {
result.add(parseNodeUrl(url));
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d6598a5d14..ae84c4f2f1 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -33,9 +33,14 @@ rpc_port=6667
internal_ip=127.0.0.1
# Datatype: int
-# port for communication between cluster nodes.
+# port for coordinator's communication between cluster nodes.
internal_port=9003
+
+# Datatype: int
+# port for consensus's communication between cluster nodes.
+consensus_port=40010
+
# comma-separated {IP/DOMAIN}:internal_port pairs
# Data nodes store config nodes ip and port to communicate with config nodes.
# Several nodes will be picked randomly to send the request, the number of nodes
@@ -140,6 +145,18 @@ config_nodes=127.0.0.1:22277
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# wal_dir=data/wal
+# consensus dir
+# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data).
+# If it is absolute, system will save the data in the exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# Note: If consensus_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data\\consensus
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data/consensus
+
# TSFile storage file system. Currently, Tsfiles are supported to be stored in LOCAL file system or HDFS.
# Datatype: FSType
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 580d8f7552..a78d38baa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -260,6 +260,9 @@ public class IoTDBConfig {
/** Wal directory. */
private String walDir = DEFAULT_BASE_DIR + File.separator + "wal";
+ /** Consensus directory. */
+ private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
+
/** Maximum MemTable number. Invalid when enableMemControl is true. */
private int maxMemtableNumber = 0;
@@ -820,12 +823,22 @@ public class IoTDBConfig {
/** Internal ip for data node */
private String internalIp;
- /** Internal port of data node */
+ /** Internal port for coordinator */
private int internalPort = 9003;
+ /** Internal port for consensus protocol */
+ private int consensusPort = 40010;
+
/** The max time of data node waiting to join into the cluster */
private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(60);
+ /**
+ * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
+ * set this variable so that the correct class name can be obtained later when the consensus layer
+ * singleton is initialized
+ */
+ private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
public IoTDBConfig() {
try {
internalIp = InetAddress.getLocalHost().getHostAddress();
@@ -937,6 +950,7 @@ public class IoTDBConfig {
syncDir = addHomeDir(syncDir);
tracingDir = addHomeDir(tracingDir);
walDir = addHomeDir(walDir);
+ consensusDir = addHomeDir(consensusDir);
indexRootFolder = addHomeDir(indexRootFolder);
extDir = addHomeDir(extDir);
udfDir = addHomeDir(udfDir);
@@ -1142,6 +1156,14 @@ public class IoTDBConfig {
this.walDir = walDir;
}
+ public String getConsensusDir() {
+ return consensusDir;
+ }
+
+ public void setConsensusDir(String consensusDir) {
+ this.consensusDir = consensusDir;
+ }
+
public String getExtDir() {
return extDir;
}
@@ -2579,6 +2601,14 @@ public class IoTDBConfig {
this.internalPort = internalPort;
}
+ public int getConsensusPort() {
+ return consensusPort;
+ }
+
+ public void setConsensusPort(int consensusPort) {
+ this.consensusPort = consensusPort;
+ }
+
public long getJoinClusterTimeOutMs() {
return joinClusterTimeOutMs;
}
@@ -2586,4 +2616,12 @@ public class IoTDBConfig {
public void setJoinClusterTimeOutMs(long joinClusterTimeOutMs) {
this.joinClusterTimeOutMs = joinClusterTimeOutMs;
}
+
+ public String getConsensusProtocolClass() {
+ return consensusProtocolClass;
+ }
+
+ public void setConsensusProtocolClass(String consensusProtocolClass) {
+ this.consensusProtocolClass = consensusProtocolClass;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e384b5c2b1..0e2934ebf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -243,6 +243,8 @@ public class IoTDBDescriptor {
conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
+ conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+
int mlogBufferSize =
Integer.parseInt(
properties.getProperty(
@@ -1472,6 +1474,10 @@ public class IoTDBDescriptor {
conf.setInternalPort(
Integer.parseInt(
properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
+
+ conf.setConsensusPort(
+ Integer.parseInt(
+ properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
}
/** Get default encode algorithm by data type */
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index 9b1a2ed498..e9ecb69131 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -43,16 +43,22 @@ public class ConsensusExample {
public static void main(String[] args) throws IllegalPathException, IOException {
IConsensus consensusImpl =
- new StandAloneConsensus(
- id -> {
- switch (id.getType()) {
- case SchemaRegion:
- return new SchemaRegionStateMachine();
- case DataRegion:
- return new DataRegionStateMachine();
- }
- return new EmptyStateMachine();
- });
+ IConsensusFactory.getConsensusImpl(
+ "org.apache.iotdb.consensus.standalone.StandAloneConsensus",
+ new Endpoint("localhost", 6667),
+ new File("./"),
+ gid -> {
+ switch (gid.getType()) {
+ case SchemaRegion:
+ return new SchemaRegionStateMachine();
+ case DataRegion:
+ return new DataRegionStateMachine();
+ }
+ throw new IllegalArgumentException(
+ String.format("Unexpected consensusGroup %s", gid));
+ })
+ .orElseThrow(
+ () -> new IllegalArgumentException(IConsensusFactory.CONSTRUCT_FAILED_MSG));
consensusImpl.start();
InsertRowPlan plan = getInsertRowPlan();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
new file mode 100644
index 0000000000..9fd76a7585
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+
+import java.io.File;
+
+public class ConsensusImpl {
+
+ private ConsensusImpl() {}
+
+ public IConsensus getInstance() {
+ return ConsensusImplHolder.INSTANCE;
+ }
+
+ private static class ConsensusImplHolder {
+
+ private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ private static final IConsensus INSTANCE =
+ IConsensusFactory.getConsensusImpl(
+ conf.getConsensusProtocolClass(),
+ new Endpoint(conf.getInternalIp(), conf.getConsensusPort()),
+ new File(conf.getConsensusDir()),
+ gid -> {
+ switch (gid.getType()) {
+ case SchemaRegion:
+ return StorageEngine.getInstance().getOrCreateSchemaRegionStateMachine(gid);
+ case DataRegion:
+ return StorageEngine.getInstance().getOrCreateDataRegionStateMachine(gid);
+ }
+ throw new IllegalArgumentException(
+ String.format("Unexpected consensusGroup %s", gid));
+ })
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ IConsensusFactory.CONSTRUCT_FAILED_MSG,
+ conf.getConsensusProtocolClass())));
+
+ private ConsensusImplHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4ef3ad8f07..ff59f952c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -39,13 +39,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
- logger.info("Execute write plan in DataRegionStateMachine : {}", fragmentInstance);
+ logger.info("Execute write plan in DataRegionStateMachine");
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
protected DataSet read(FragmentInstance fragmentInstance) {
- logger.info("Execute read plan in DataRegionStateMachine: {}", fragmentInstance);
+ logger.info("Execute read plan in DataRegionStateMachine");
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 628874f1d3..3c3be5d208 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -39,13 +39,13 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
- logger.info("Execute write plan in SchemaRegionStateMachine : {}", fragmentInstance);
+ logger.info("Execute write plan in SchemaRegionStateMachine");
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
protected DataSet read(FragmentInstance fragmentInstance) {
- logger.info("Execute read plan in SchemaRegionStateMachine: {}", fragmentInstance);
+ logger.info("Execute read plan in SchemaRegionStateMachine");
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7c92e4216b..d8b4096b49 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.commons.service.IService;
@@ -28,6 +29,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.ServerConfigConsistent;
+import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -136,6 +140,8 @@ public class StorageEngine implements IService {
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
+ private final Map<ConsensusGroupId, BaseStateMachine> stateMachineMap = new ConcurrentHashMap<>();
+
private StorageEngine() {}
public static StorageEngine getInstance() {
@@ -1073,6 +1079,16 @@ public class StorageEngine implements IService {
}
}
+ public SchemaRegionStateMachine getOrCreateSchemaRegionStateMachine(ConsensusGroupId gid) {
+ return (SchemaRegionStateMachine)
+ stateMachineMap.computeIfAbsent(gid, id -> new SchemaRegionStateMachine());
+ }
+
+ public DataRegionStateMachine getOrCreateDataRegionStateMachine(ConsensusGroupId gid) {
+ return (DataRegionStateMachine)
+ stateMachineMap.computeIfAbsent(gid, id -> new DataRegionStateMachine());
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index d5533e6cd7..eaa85865be 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.ConfigurationException;
@@ -62,7 +63,7 @@ public class DataNode implements DataNodeMBean {
*/
private static final int DEFAULT_JOIN_RETRY = 10;
- private EndPoint thisNode = new EndPoint();
+ private Endpoint thisNode = new Endpoint();
private int dataNodeID;
@@ -122,7 +123,7 @@ public class DataNode implements DataNodeMBean {
}
public void joinCluster() throws StartupException {
- List<EndPoint> configNodes;
+ List<Endpoint> configNodes;
try {
configNodes =
CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
@@ -134,12 +135,13 @@ public class DataNode implements DataNodeMBean {
while (retry > 0) {
// randomly pick up a config node to try
Random random = new Random();
- EndPoint configNode = configNodes.get(random.nextInt(configNodes.size()));
+ Endpoint configNode = configNodes.get(random.nextInt(configNodes.size()));
logger.info("start joining the cluster with the help of {}", configNode);
try {
ConfigIService.Client client = createClient(configNode);
DataNodeRegisterResp dataNodeRegisterResp =
- client.registerDataNode(new DataNodeRegisterReq(thisNode));
+ client.registerDataNode(
+ new DataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
if (dataNodeRegisterResp.getRegisterResult().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeID = dataNodeRegisterResp.getDataNodeID();
@@ -199,13 +201,13 @@ public class DataNode implements DataNodeMBean {
private DataNodeHolder() {}
}
- private ConfigIService.Client createClient(EndPoint endPoint) throws IoTDBConnectionException {
+ private ConfigIService.Client createClient(Endpoint endpoint) throws IoTDBConnectionException {
TTransport transport;
try {
transport =
RpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use TSocket.wrap
- endPoint.getIp(), endPoint.getPort(), 2000);
+ endpoint.getIp(), endpoint.getPort(), 2000);
transport.open();
} catch (TTransportException e) {
throw new IoTDBConnectionException(e);