You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/17 14:34:25 UTC
[iotdb] 01/01: split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira3188
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3b5fb851641f8c90a9d0711b1c9669a4c5237b23
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 17 22:34:03 2022 +0800
split dataRegion/schemaRegion consensus protocol && multiLeaderConsensus init
---
.../resources/conf/iotdb-confignode.properties | 12 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 25 ++-
.../confignode/conf/ConfigNodeDescriptor.java | 9 +-
.../confignode/conf/ConfigNodeStartupCheck.java | 36 +++-
.../iotdb/confignode/manager/ConfigManager.java | 13 +-
.../iotdb/confignode/manager/NodeManager.java | 9 +-
.../consensus/request/ConfigRequestSerDeTest.java | 12 +-
.../iotdb/confignode/persistence/NodeInfoTest.java | 3 +-
.../confignode/persistence/PartitionInfoTest.java | 3 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 19 +-
consensus/pom.xml | 5 +
.../org/apache/iotdb/consensus/common/Peer.java | 15 ++
.../common/request/IndexedConsensusRequest.java | 55 +++++
.../consensus/multileader/IndexController.java | 130 ++++++++++++
.../multileader/MultiLeaderConsensus.java | 227 +++++++++++++++++++++
.../multileader/MultiLeaderServerImpl.java | 139 +++++++++++++
.../apache/iotdb/consensus/multileader/Utils.java | 29 +++
.../asyncLogAppender/AsyncLogAppender.java | 35 ++++
.../multileader/service/MultiLeaderRPCService.java | 99 +++++++++
.../service/MultiLeaderRPCServiceHandler.java | 51 +++++
.../service/MultiLeaderRPCServiceMBean.java | 22 ++
.../service/MultiLeaderRPCServiceProcessor.java | 74 +++++++
.../iotdb/consensus/ratis/RequestMessage.java | 3 +-
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../iotdb/commons/consensus/ConsensusGroupId.java | 8 +-
.../apache/iotdb/commons/service/ServiceType.java | 3 +-
.../iotdb/commons/partition/SerializeTest.java | 3 +-
.../commons/utils/ThriftCommonsSerDeUtilsTest.java | 6 +-
pom.xml | 2 +
.../resources/conf/iotdb-engine.properties | 7 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 80 ++++++--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 15 +-
...ensusImpl.java => DataRegionConsensusImpl.java} | 36 ++--
...susImpl.java => SchemaRegionConsensusImpl.java} | 40 ++--
.../consensus/statemachine/BaseStateMachine.java | 16 +-
.../statemachine/DataRegionStateMachine.java | 33 ++-
.../statemachine/SchemaRegionStateMachine.java | 14 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 23 ++-
.../java/org/apache/iotdb/db/service/DataNode.java | 12 +-
.../service/thrift/impl/InternalServiceImpl.java | 46 +++--
.../mpp/plan/plan/FragmentInstanceSerdeTest.java | 6 +-
.../iotdb/db/service/InternalServiceImplTest.java | 24 ++-
.../datanode1conf/iotdb-engine.properties | 3 +-
.../datanode2conf/iotdb-engine.properties | 3 +-
.../datanode3conf/iotdb-engine.properties | 3 +-
thrift-commons/src/main/thrift/common.thrift | 6 +-
.../src/main/thrift/confignode.thrift | 24 ++-
thrift-multi-leader-consensus/pom.xml | 47 +++++
.../src/main/thrift/mutlileader.thrift | 34 +++
49 files changed, 1330 insertions(+), 191 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2ca585b2d1..310fed9224 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -54,14 +54,20 @@ target_confignode=0.0.0.0:22277
# All parameters in Consensus protocol configuration is unmodifiable after ConfigNode starts for the first time.
# And these parameters should be consistent within the ConfigNodeGroup.
-
-# DataNode consensus protocol type
+# DataRegion consensus protocol type
# These consensus protocols are currently supported:
# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# 3. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus(Raft protocol)
# Datatype: String
-# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+# data_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+# SchemaRegion consensus protocol type
+# These consensus protocols are currently supported:
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, only supports stand-alone machine)
+# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
+# Datatype: String
+# schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
####################
### PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 923fe5c700..5bc9c9aefb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -58,8 +58,13 @@ public class ConfigNodeConf {
private final String configNodeConsensusProtocolClass =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
- /** DataNode Regions consensus protocol */
- private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+ /** DataNode data region consensus protocol */
+ private String dataRegionConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ /** DataNode schema region consensus protocol */
+ private String schemaRegionConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
/**
* ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
@@ -308,12 +313,20 @@ public class ConfigNodeConf {
return configNodeConsensusProtocolClass;
}
- public String getDataNodeConsensusProtocolClass() {
- return dataNodeConsensusProtocolClass;
+ public String getDataRegionConsensusProtocolClass() {
+ return dataRegionConsensusProtocolClass;
+ }
+
+ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+ this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+ }
+
+ public String getSchemaRegionConsensusProtocolClass() {
+ return schemaRegionConsensusProtocolClass;
}
- public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolClass) {
- this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
+ public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+ this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
}
public int getThriftServerAwaitTimeForStopService() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 240021ae40..4952a335d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -129,9 +129,14 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
- conf.setDataNodeConsensusProtocolClass(
+ conf.setDataRegionConsensusProtocolClass(
properties.getProperty(
- "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass()));
+ "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()));
+
+ conf.setSchemaRegionConsensusProtocolClass(
+ properties.getProperty(
+ "schema_region_consensus_protocol_class",
+ conf.getSchemaRegionConsensusProtocolClass()));
conf.setRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 7480d78e0c..7a98ca4912 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -95,7 +95,7 @@ public class ConfigNodeStartupCheck {
// When the DataNode consensus protocol is set to StandAlone,
// the replication factor must be 1
- if (conf.getDataNodeConsensusProtocolClass()
+ if (conf.getDataRegionConsensusProtocolClass()
.equals("org.apache.iotdb.consensus.standalone.StandAloneConsensus")) {
if (conf.getSchemaReplicationFactor() != 1) {
throw new ConfigurationException(
@@ -183,7 +183,8 @@ public class ConfigNodeStartupCheck {
new TConfigNodeLocation(
new TEndPoint(conf.getRpcAddress(), conf.getRpcPort()),
new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort())),
- conf.getDataNodeConsensusProtocolClass(),
+ conf.getDataRegionConsensusProtocolClass(),
+ conf.getSchemaRegionConsensusProtocolClass(),
conf.getSeriesPartitionSlotNum(),
conf.getSeriesPartitionExecutorClass(),
CommonDescriptor.getInstance().getConfig().getDefaultTTL(),
@@ -215,7 +216,9 @@ public class ConfigNodeStartupCheck {
systemProperties.setProperty(
"config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass());
systemProperties.setProperty(
- "data_node_consensus_protocol_class", conf.getDataNodeConsensusProtocolClass());
+ "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass());
+ systemProperties.setProperty(
+ "schema_region_consensus_protocol_class", conf.getSchemaRegionConsensusProtocolClass());
// PartitionSlot configuration
systemProperties.setProperty(
@@ -282,15 +285,28 @@ public class ConfigNodeStartupCheck {
configNodeConsensusProtocolClass);
}
- String dataNodeConsensusProtocolClass =
- systemProperties.getProperty("data_node_consensus_protocol_class", null);
- if (dataNodeConsensusProtocolClass == null) {
+ String dataRegionConsensusProtocolClass =
+ systemProperties.getProperty("data_region_consensus_protocol_class", null);
+ if (dataRegionConsensusProtocolClass == null) {
+ needReWrite = true;
+ } else if (!dataRegionConsensusProtocolClass.equals(
+ conf.getDataRegionConsensusProtocolClass())) {
+ throw new ConfigurationException(
+ "data_region_consensus_protocol_class",
+ conf.getDataRegionConsensusProtocolClass(),
+ dataRegionConsensusProtocolClass);
+ }
+
+ String schemaRegionConsensusProtocolClass =
+ systemProperties.getProperty("schema_region_consensus_protocol_class", null);
+ if (schemaRegionConsensusProtocolClass == null) {
needReWrite = true;
- } else if (!dataNodeConsensusProtocolClass.equals(conf.getDataNodeConsensusProtocolClass())) {
+ } else if (!schemaRegionConsensusProtocolClass.equals(
+ conf.getSchemaRegionConsensusProtocolClass())) {
throw new ConfigurationException(
- "data_node_consensus_protocol_class",
- conf.getDataNodeConsensusProtocolClass(),
- dataNodeConsensusProtocolClass);
+ "schema_region_consensus_protocol_class",
+ conf.getSchemaRegionConsensusProtocolClass(),
+ schemaRegionConsensusProtocolClass);
}
// PartitionSlot configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 278859b9c4..d2ee15fc8b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -480,11 +480,20 @@ public class ConfigManager implements Manager {
ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
TConfigNodeRegisterResp errorResp = new TConfigNodeRegisterResp();
errorResp.setStatus(new TSStatus(TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()));
- if (!req.getDataNodeConsensusProtocolClass().equals(conf.getDataNodeConsensusProtocolClass())) {
+ if (!req.getDataRegionConsensusProtocolClass()
+ .equals(conf.getDataRegionConsensusProtocolClass())) {
errorResp
.getStatus()
.setMessage(
- "Reject register, please ensure that the data_node_consensus_protocol_class are consistent.");
+ "Reject register, please ensure that the data_region_consensus_protocol_class are consistent.");
+ return errorResp;
+ }
+ if (!req.getSchemaRegionConsensusProtocolClass()
+ .equals(conf.getSchemaRegionConsensusProtocolClass())) {
+ errorResp
+ .getStatus()
+ .setMessage(
+ "Reject register, please ensure that the schema_region_consensus_protocol_class are consistent.");
return errorResp;
}
if (req.getSeriesPartitionSlotNum() != conf.getSeriesPartitionSlotNum()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b5d778e8b5..10c3698a9f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
@@ -61,8 +60,10 @@ public class NodeManager {
private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
// Set TGlobalConfig
TGlobalConfig globalConfig = new TGlobalConfig();
- globalConfig.setDataNodeConsensusProtocolClass(
- ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass());
+ globalConfig.setDataRegionConsensusProtocolClass(
+ ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+ globalConfig.setSchemaRegionConsensusProtocolClass(
+ ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass());
globalConfig.setSeriesPartitionSlotNum(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
globalConfig.setSeriesPartitionExecutorClass(
@@ -149,7 +150,7 @@ public class NodeManager {
// Return PartitionRegionId
resp.setPartitionRegionId(
- ConsensusGroupId.convertToTConsensusGroupId(getConsensusManager().getConsensusGroupId()));
+ getConsensusManager().getConsensusGroupId().convertToTConsensusGroupId());
// Return online ConfigNodes
resp.setConfigNodeList(nodeInfo.getOnlineConfigNodes());
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 89f9cdfefa..ad95524b9e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -85,7 +85,8 @@ public class ConfigRequestSerDeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 7777));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
RegisterDataNodeReq req0 = new RegisterDataNodeReq(dataNodeLocation);
req0.serialize(buffer);
buffer.flip();
@@ -204,7 +205,8 @@ public class ConfigRequestSerDeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
CreateRegionsReq req0 = new CreateRegionsReq();
TRegionReplicaSet dataRegionSet = new TRegionReplicaSet();
@@ -242,7 +244,8 @@ public class ConfigRequestSerDeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
String storageGroup = "root.sg0";
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
@@ -303,7 +306,8 @@ public class ConfigRequestSerDeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
String storageGroup = "root.sg0";
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(10);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 2caf683a1b..438a539167 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -97,6 +97,7 @@ public class NodeInfoTest {
new TEndPoint("127.0.0.1", 6600 + flag),
new TEndPoint("127.0.0.1", 7700 + flag),
new TEndPoint("127.0.0.1", 8800 + flag),
- new TEndPoint("127.0.0.1", 9900 + flag));
+ new TEndPoint("127.0.0.1", 9900 + flag),
+ new TEndPoint("127.0.0.1", 1000 + flag));
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 6e120afd86..e4286e72dc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -144,7 +144,8 @@ public class PartitionInfoTest {
tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
- tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+ tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+ tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
dataNodeLocations.add(tDataNodeLocation);
}
tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index bc172462ab..f052fdca20 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -119,8 +119,11 @@ public class ConfigNodeRPCServiceProcessorTest {
private void checkGlobalConfig(TGlobalConfig globalConfig) {
Assert.assertEquals(
- ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass(),
- globalConfig.getDataNodeConsensusProtocolClass());
+ ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
+ globalConfig.getDataRegionConsensusProtocolClass());
+ Assert.assertEquals(
+ ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
+ globalConfig.getSchemaRegionConsensusProtocolClass());
Assert.assertEquals(
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
globalConfig.getSeriesPartitionSlotNum());
@@ -135,7 +138,8 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -155,7 +159,8 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation);
TDataNodeRegisterResp resp = processor.registerDataNode(req);
@@ -178,7 +183,8 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
Assert.assertEquals(dataNodeLocation, locationList.get(i).getValue());
}
@@ -192,7 +198,8 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
Assert.assertEquals(dataNodeLocation, locationMap.get(1));
}
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 7284a5fee7..0d6022ea10 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -52,6 +52,11 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>thrift-multi-leader-consensus</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index 99379156fb..b3bba6dace 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.consensus.common;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import java.nio.ByteBuffer;
import java.util.Objects;
// TODO Use a mature IDL framework such as Protobuf to manage this structure
@@ -43,6 +45,19 @@ public class Peer {
return endpoint;
}
+ public void serialize(ByteBuffer buffer) {
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+ groupId.convertToTConsensusGroupId(), buffer);
+ ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, buffer);
+ }
+
+ public static Peer Deserialize(ByteBuffer buffer) {
+ return new Peer(
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
+ ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer)),
+ ThriftCommonsSerDeUtils.deserializeTEndPoint(buffer));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
new file mode 100644
index 0000000000..a1dccbfb60
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+
+/** only used for multi-leader consensus */
+public class IndexedConsensusRequest implements IConsensusRequest {
+
+ private final long minSyncIndex;
+ private final long currentIndex;
+ private final IConsensusRequest request;
+
+ public IndexedConsensusRequest(long minSyncIndex, long currentIndex, IConsensusRequest request) {
+ this.minSyncIndex = minSyncIndex;
+ this.currentIndex = currentIndex;
+ this.request = request;
+ }
+
+ @Override
+ public void serializeRequest(ByteBuffer buffer) {
+ buffer.putLong(minSyncIndex);
+ buffer.putLong(currentIndex);
+ request.serializeRequest(buffer);
+ }
+
+ public IConsensusRequest getRequest() {
+ return request;
+ }
+
+ public long getMinSyncIndex() {
+ return minSyncIndex;
+ }
+
+ public long getCurrentIndex() {
+ return currentIndex;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
new file mode 100644
index 0000000000..6bad07da41
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/IndexController.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+
+/** An index controller class to balance the performance degradation of frequent disk I/O. */
+@ThreadSafe
+public class IndexController {
+
+ private final Logger logger = LoggerFactory.getLogger(IndexController.class);
+ private static final int FLUSH_INTERVAL = 500;
+
+ private volatile long lastFlushedIndex;
+ private volatile long currentIndex;
+
+ private final String storageDir;
+ private final String prefix;
+ private final boolean incrementIntervalAfterRestart;
+
+ public IndexController(String storageDir, String prefix, boolean incrementIntervalAfterRestart) {
+ this.storageDir = storageDir;
+ this.prefix = prefix + '-';
+ this.incrementIntervalAfterRestart = incrementIntervalAfterRestart;
+ restore();
+ }
+
+ public synchronized long incrementAndGet() {
+ currentIndex++;
+ checkPersist();
+ return currentIndex;
+ }
+
+ public synchronized long updateAndGet(int index) {
+ currentIndex = Math.max(currentIndex, index);
+ checkPersist();
+ return currentIndex;
+ }
+
+ public long getCurrentIndex() {
+ return currentIndex;
+ }
+
+ private void checkPersist() {
+ if (currentIndex - lastFlushedIndex >= FLUSH_INTERVAL) {
+ persist();
+ }
+ }
+
+ private void persist() {
+ File oldFile = new File(storageDir, prefix + lastFlushedIndex);
+ File newFile = new File(storageDir, prefix + currentIndex);
+ try {
+ if (oldFile.exists()) {
+ FileUtils.moveFile(oldFile, newFile);
+ }
+ logger.info(
+ "Version file updated, previous: {}, current: {}",
+ oldFile.getAbsolutePath(),
+ newFile.getAbsolutePath());
+ lastFlushedIndex = currentIndex;
+ } catch (IOException e) {
+ logger.error("Error occurred when flushing next version.", e);
+ }
+ }
+
+ private void restore() {
+ File directory = new File(storageDir);
+ File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
+ File versionFile;
+ if (versionFiles != null && versionFiles.length > 0) {
+ long maxVersion = 0;
+ int maxVersionIndex = 0;
+ for (int i = 0; i < versionFiles.length; i++) {
+ long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+ if (fileVersion > maxVersion) {
+ maxVersion = fileVersion;
+ maxVersionIndex = i;
+ }
+ }
+ lastFlushedIndex = maxVersion;
+ for (int i = 0; i < versionFiles.length; i++) {
+ if (i != maxVersionIndex) {
+ versionFiles[i].delete();
+ }
+ }
+ } else {
+ versionFile = new File(directory, prefix + "0");
+ lastFlushedIndex = 0;
+ try {
+ if (!versionFile.createNewFile()) {
+ logger.warn("Cannot create new version file {}", versionFile);
+ }
+ } catch (IOException e) {
+ logger.error("Error occurred when creating new file {}.", versionFile.getName(), e);
+ }
+ }
+ if (incrementIntervalAfterRestart) {
+ // prevent overlapping in case of failure
+ currentIndex = lastFlushedIndex + FLUSH_INTERVAL;
+ persist();
+ } else {
+ currentIndex = lastFlushedIndex;
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
new file mode 100644
index 0000000000..4010e7b75c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
+import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MultiLeaderConsensus implements IConsensus {
+
+ private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensus.class);
+
+ private final TEndPoint thisNode;
+ private final File storageDir;
+ private final IStateMachine.Registry registry;
+ private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
+ new ConcurrentHashMap<>();
+ private final MultiLeaderRPCService service;
+
+ public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry registry) {
+ this.thisNode = thisNode;
+ this.storageDir = storageDir;
+ this.registry = registry;
+ this.service = new MultiLeaderRPCService(thisNode);
+ }
+
+ @Override
+ public void start() throws IOException {
+ initAndRecover();
+ service.initSyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+ }
+
+ private void initAndRecover() throws IOException {
+ if (!storageDir.exists()) {
+ if (!storageDir.mkdirs()) {
+ logger.warn("Unable to create consensus dir at {}", storageDir);
+ }
+ } else {
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ String[] items = path.getFileName().toString().split("_");
+ ConsensusGroupId consensusGroupId =
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.valueOf(items[0]).getValue(), Integer.parseInt(items[1]));
+ TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
+ stateMachineMap.put(
+ consensusGroupId,
+ new MultiLeaderServerImpl(
+ path.toString(),
+ new Peer(consensusGroupId, endPoint),
+ registry.apply(consensusGroupId)));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws IOException {}
+
+ @Override
+ public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
+ MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusWriteResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
+ }
+
+ @Override
+ public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
+ MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusReadResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+ AtomicBoolean exist = new AtomicBoolean(true);
+ stateMachineMap.computeIfAbsent(
+ groupId,
+ (k) -> {
+ exist.set(false);
+ String path = buildPeerDir(groupId);
+ File file = new File(path);
+ if (!file.mkdirs()) {
+ logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+ }
+ MultiLeaderServerImpl impl =
+ new MultiLeaderServerImpl(
+ path, new Peer(groupId, thisNode), peers, registry.apply(groupId));
+ impl.start();
+ return impl;
+ });
+ if (exist.get()) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupAlreadyExistException(groupId))
+ .build();
+ }
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
+ AtomicBoolean exist = new AtomicBoolean(false);
+ stateMachineMap.computeIfPresent(
+ groupId,
+ (k, v) -> {
+ exist.set(true);
+ v.stop();
+ String path = buildPeerDir(groupId);
+ File file = new File(path);
+ if (!file.delete()) {
+ logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+ }
+ return null;
+ });
+
+ if (!exist.get()) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ return true;
+ }
+
+ @Override
+ public Peer getLeader(ConsensusGroupId groupId) {
+ if (!stateMachineMap.containsKey(groupId)) {
+ return null;
+ }
+ return new Peer(groupId, thisNode);
+ }
+
+ public MultiLeaderServerImpl getImpl(ConsensusGroupId groupId) {
+ return stateMachineMap.get(groupId);
+ }
+
+ private String buildPeerDir(ConsensusGroupId groupId) {
+ return storageDir
+ + File.separator
+ + groupId.getType()
+ + "_"
+ + groupId.getId()
+ + "_"
+ + thisNode.getIp()
+ + "_"
+ + thisNode.getPort();
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
new file mode 100644
index 0000000000..a1f73b525d
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.asyncLogAppender.AsyncLogAppender;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MultiLeaderServerImpl {
+
+ private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
+ private final String configurationFileName = "configuration.dat";
+ private final Peer thisNode;
+ private final IStateMachine stateMachine;
+ private final String storageDir;
+ private List<Peer> configuration;
+ private IndexController currentNodeController;
+ private List<AsyncLogAppender> asyncLogAppenders;
+
+ private static final int DEFAULT_BUFFER_SIZE = 1024 * 2;
+
+ public MultiLeaderServerImpl(
+ String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine) {
+ this.storageDir = storageDir;
+ this.thisNode = thisNode;
+ this.stateMachine = stateMachine;
+ this.currentNodeController =
+ new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+ this.configuration = configuration;
+ persistConfiguration();
+ // configuration.stream().filter(x -> !Objects.equals(x, thisNode)).
+
+ }
+
+ public MultiLeaderServerImpl(String storageDir, Peer thisNode, IStateMachine stateMachine) {
+ this.storageDir = storageDir;
+ this.thisNode = thisNode;
+ this.stateMachine = stateMachine;
+ this.currentNodeController =
+ new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), false);
+ this.configuration = new ArrayList<>();
+ recoverConfiguration();
+ }
+
+ public IStateMachine getStateMachine() {
+ return stateMachine;
+ }
+
+ public void start() {
+ stateMachine.start();
+ }
+
+ public void stop() {
+ stateMachine.stop();
+ }
+
+ public TSStatus write(IConsensusRequest request) {
+ synchronized (stateMachine) {
+ IndexedConsensusRequest newRequest =
+ new IndexedConsensusRequest(
+ Long.MAX_VALUE, currentNodeController.incrementAndGet(), request);
+ return stateMachine.write(newRequest);
+ }
+ }
+
+ public DataSet read(IConsensusRequest request) {
+ return stateMachine.read(request);
+ }
+
+ public boolean takeSnapshot(File snapshotDir) {
+ return stateMachine.takeSnapshot(snapshotDir);
+ }
+
+ public void loadSnapshot(File latestSnapshotRootDir) {
+ stateMachine.loadSnapshot(latestSnapshotRootDir);
+ }
+
+ public void persistConfiguration() {
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ buffer.putInt(configuration.size());
+ for (Peer peer : configuration) {
+ peer.serialize(buffer);
+ }
+ try {
+ Files.write(
+ Paths.get(new File(storageDir, configurationFileName).getAbsolutePath()), buffer.array());
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when persisting configuration", e);
+ }
+ }
+
+ public void recoverConfiguration() {
+ ByteBuffer buffer;
+ try {
+ buffer =
+ ByteBuffer.wrap(
+ Files.readAllBytes(
+ Paths.get(new File(storageDir, configurationFileName).getAbsolutePath())));
+ int size = buffer.getInt();
+ for (int i = 0; i < size; i++) {
+ configuration.add(Peer.Deserialize(buffer));
+ }
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when recovering configuration", e);
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
new file mode 100644
index 0000000000..a89f47513a
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/Utils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class Utils {
+
+ public static String fromTEndPointToString(TEndPoint endpoint) {
+ return String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
new file mode 100644
index 0000000000..7d53bbb208
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/asyncLogAppender/AsyncLogAppender.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.asyncLogAppender;
+
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.multileader.IndexController;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class AsyncLogAppender {
+
+ private MultiLeaderServerImpl impl;
+ private Peer peer;
+ private ArrayBlockingQueue<IConsensusRequest> pendingRequest;
+ private IndexController controller;
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
new file mode 100644
index 0000000000..455bd73fe8
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class MultiLeaderRPCService extends ThriftService implements MultiLeaderRPCServiceMBean {
+
+ // TODO make it configurable
+ private static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
+ private static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
+ private static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
+
+ private final TEndPoint thisNode;
+ private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
+
+ public MultiLeaderRPCService(TEndPoint thisNode) {
+ this.thisNode = thisNode;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.MULTI_LEADER_CONSENSUS_SERVICE;
+ }
+
+ @Override
+ public void initSyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
+ this.multiLeaderRPCServiceProcessor =
+ (MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
+ super.mbeanName =
+ String.format(
+ "%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, getID().getJmxName());
+ super.initSyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+ }
+
+ @Override
+ public void initTProcessor()
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException {
+ processor = new MultiLeaderConsensusIService.Processor<>(multiLeaderRPCServiceProcessor);
+ }
+
+ @Override
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ try {
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
+ getBindIP(),
+ getBindPort(),
+ RPC_MAX_CONCURRENT_CLIENT_NUM,
+ THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+ new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
+ IS_RPC_THRIFT_COMPRESSION_ENABLED);
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+ thriftServiceThread.setName(ThreadName.MULTI_LEADER_CONSENSUS_RPC_SERVER.getName());
+ }
+
+ @Override
+ public String getBindIP() {
+ return thisNode.getIp();
+ }
+
+ @Override
+ public int getBindPort() {
+ return thisNode.getPort();
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
new file mode 100644
index 0000000000..889ac7d517
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class MultiLeaderRPCServiceHandler implements TServerEventHandler {
+
+ private final MultiLeaderRPCServiceProcessor processor;
+
+ public MultiLeaderRPCServiceHandler(MultiLeaderRPCServiceProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void preServe() {}
+
+ @Override
+ public ServerContext createContext(TProtocol input, TProtocol output) {
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+ processor.handleClientExit();
+ }
+
+ @Override
+ public void processContext(
+ ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
new file mode 100644
index 0000000000..9e354ac204
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+public interface MultiLeaderRPCServiceMBean {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
new file mode 100644
index 0000000000..8cea5f4007
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
+import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.Iface {
+
+ private final Logger logger = LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
+
+ private final MultiLeaderConsensus consensus;
+
+ public MultiLeaderRPCServiceProcessor(MultiLeaderConsensus consensus) {
+ this.consensus = consensus;
+ }
+
+ @Override
+ public TSyncLogRes syncLog(TSyncLogReq req) throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("Unexpected consensusGroupId %s for TSyncLogReq %s", groupId, req);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TSyncLogRes(Collections.singletonList(status));
+ }
+ List<TSStatus> status = new ArrayList<>();
+ synchronized (impl.getStateMachine()) {
+ for (ByteBuffer batch : req.getBatches()) {
+ status.add(impl.getStateMachine().write(new ByteBufferConsensusRequest(batch)));
+ }
+ }
+ return new TSyncLogRes(status);
+ }
+
+ public void handleClientExit() {}
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 242573e5be..d1af672b85 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -46,9 +46,8 @@ public class RequestMessage implements Message {
if (serializedContent == null) {
synchronized (this) {
if (serializedContent == null) {
- ByteBufferConsensusRequest req;
if (actualRequest instanceof ByteBufferConsensusRequest) {
- req = (ByteBufferConsensusRequest) actualRequest;
+ ByteBufferConsensusRequest req = (ByteBufferConsensusRequest) actualRequest;
serializedContent = ByteString.copyFrom(req.getContent());
req.getContent().flip();
} else {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index a9222f36ad..c691022c78 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,6 +74,8 @@ public enum ThreadName {
CLUSTER_MONITOR("ClusterMonitor"),
CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
+ MULTI_LEADER_CONSENSUS_RPC_CLIENT("MultiLeaderConsensusRPC-Client"),
+ MULTI_LEADER_CONSENSUS_RPC_SERVER("MultiLeaderConsensusRPC-Server"),
DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
Cluster_Monitor("ClusterMonitor"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index 0bc5556c15..2a5ab281f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -37,6 +37,10 @@ public abstract class ConsensusGroupId {
// return specific type
public abstract TConsensusGroupType getType();
+ public TConsensusGroupId convertToTConsensusGroupId() {
+ return new TConsensusGroupId(getType(), getId());
+ }
+
@Override
public int hashCode() {
return Objects.hash(getType(), getId());
@@ -82,10 +86,6 @@ public abstract class ConsensusGroupId {
}
}
- public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
- return new TConsensusGroupId(consensusGroupId.getType(), consensusGroupId.getId());
- }
-
public static String formatTConsensusGroupId(TConsensusGroupId groupId) {
StringBuilder format = new StringBuilder();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 6b068618ce..edece800fe 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,7 +72,8 @@ public enum ServiceType {
FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
DATA_BLOCK_MANAGER_SERVICE("Data block manager", "DataBlockManager"),
INTERNAL_SERVICE("Internal Service", "InternalService"),
- PROCEDURE_SERVICE("Procedure Service", "ProcedureService");
+ PROCEDURE_SERVICE("Procedure Service", "ProcedureService"),
+ MULTI_LEADER_CONSENSUS_SERVICE("Multi Leader consensus Service", "MultiLeaderRPCService");
private final String name;
private final String jmxName;
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
index 1e032efc62..1f4f3152d6 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/partition/SerializeTest.java
@@ -68,7 +68,8 @@ public abstract class SerializeTest {
tDataNodeLocation.setExternalEndPoint(new TEndPoint("127.0.0.1", 6000 + i));
tDataNodeLocation.setInternalEndPoint(new TEndPoint("127.0.0.1", 7000 + i));
tDataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("127.0.0.1", 8000 + i));
- tDataNodeLocation.setConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+ tDataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 9000 + i));
+ tDataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("127.0.0.1", 10000 + i));
dataNodeLocations.add(tDataNodeLocation);
}
tRegionReplicaSet.setDataNodeLocations(dataNodeLocations);
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
index 0abde8a0fa..94a2a81427 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
@@ -58,7 +58,8 @@ public class ThriftCommonsSerDeUtilsTest {
dataNodeLocation0.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation0.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation0.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation0, buffer);
buffer.flip();
TDataNodeLocation dataNodeLocation1 =
@@ -108,7 +109,8 @@ public class ThriftCommonsSerDeUtilsTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
regionReplicaSet0.getDataNodeLocations().add(dataNodeLocation);
}
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet0, buffer);
diff --git a/pom.xml b/pom.xml
index e9f0829436..4bd7cfe5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
<module>thrift</module>
<module>thrift-commons</module>
<module>thrift-confignode</module>
+ <module>thrift-multi-leader-consensus</module>
<module>thrift-cluster</module>
<module>thrift-sync</module>
<module>thrift-influxdb</module>
@@ -790,6 +791,7 @@
<sourceDirectory>thrift/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-commons/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-confignode/target/generated-sources/thrift</sourceDirectory>
+ <sourceDirectory>thrift-multi-leader-consensus/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-sync/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-cluster/target/generated-sources/thrift</sourceDirectory>
<sourceDirectory>thrift-influxdb/target/generated-sources/thrift</sourceDirectory>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3235dd92e4..9aad88b81b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -51,10 +51,13 @@ internal_ip=127.0.0.1
# port for coordinator's communication between cluster nodes.
internal_port=9003
+# Datatype: int
+# port for consensus's communication for data region between cluster nodes.
+data_region_consensus_port=40010
# Datatype: int
-# port for consensus's communication between cluster nodes.
-consensus_port=40010
+# port for consensus's communication for schema region between cluster nodes.
+schema_region_consensus_port=50010
# comma-separated {IP/DOMAIN}:internal_port pairs
# Data nodes store config nodes ip and port to communicate with config nodes.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8cf84cf10c..3ff42792bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -263,6 +263,12 @@ public class IoTDBConfig {
/** Consensus directory. */
private String consensusDir = DEFAULT_BASE_DIR + File.separator + "consensus";
+ private String dataRegionConsensusDir =
+ DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "dataRegion";
+
+ private String schemaRegionConsensusDir =
+ DEFAULT_BASE_DIR + File.separator + "consensus" + File.separator + "schemaRegion";
+
/** Maximum MemTable number. Invalid when enableMemControl is true. */
private int maxMemtableNumber = 0;
@@ -812,8 +818,11 @@ public class IoTDBConfig {
/** Internal port for coordinator */
private int internalPort = 9003;
- /** Internal port for consensus protocol */
- private int consensusPort = 40010;
+ /** Internal port for dataRegion consensus protocol */
+ private int dataRegionConsensusPort = 40010;
+
+ /** Internal port for schemaRegion consensus protocol */
+ private int schemaRegionConsensusPort = 50010;
/** Ip and port of config nodes. */
private List<TEndPoint> configNodeList =
@@ -823,11 +832,20 @@ public class IoTDBConfig {
private long joinClusterTimeOutMs = TimeUnit.SECONDS.toMillis(5);
/**
- * The consensus protocol class. The Datanode should communicate with ConfigNode on startup and
- * set this variable so that the correct class name can be obtained later when the consensus layer
- * singleton is initialized
+ * The consensus protocol class for data region. The Datanode should communicate with ConfigNode
+ * on startup and set this variable so that the correct class name can be obtained later when the
+ * data region consensus layer singleton is initialized
+ */
+ private String dataRegionConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ /**
+ * The consensus protocol class for schema region. The Datanode should communicate with ConfigNode
+ * on startup and set this variable so that the correct class name can be obtained later when the
+ * schema region consensus layer singleton is initialized
*/
- private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+ private String schemaRegionConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
/**
* The series partition executor class. The Datanode should communicate with ConfigNode on startup
@@ -1161,6 +1179,24 @@ public class IoTDBConfig {
public void setConsensusDir(String consensusDir) {
this.consensusDir = consensusDir;
+ setDataRegionConsensusDir(consensusDir + File.separator + "dataRegion");
+ setSchemaRegionConsensusDir(consensusDir + File.separator + "schemaRegion");
+ }
+
+ public String getDataRegionConsensusDir() {
+ return dataRegionConsensusDir;
+ }
+
+ public void setDataRegionConsensusDir(String dataRegionConsensusDir) {
+ this.dataRegionConsensusDir = dataRegionConsensusDir;
+ }
+
+ public String getSchemaRegionConsensusDir() {
+ return schemaRegionConsensusDir;
+ }
+
+ public void setSchemaRegionConsensusDir(String schemaRegionConsensusDir) {
+ this.schemaRegionConsensusDir = schemaRegionConsensusDir;
}
public String getExtDir() {
@@ -2620,12 +2656,20 @@ public class IoTDBConfig {
this.internalPort = internalPort;
}
- public int getConsensusPort() {
- return consensusPort;
+ public int getDataRegionConsensusPort() {
+ return dataRegionConsensusPort;
}
- public void setConsensusPort(int consensusPort) {
- this.consensusPort = consensusPort;
+ public void setDataRegionConsensusPort(int dataRegionConsensusPort) {
+ this.dataRegionConsensusPort = dataRegionConsensusPort;
+ }
+
+ public int getSchemaRegionConsensusPort() {
+ return schemaRegionConsensusPort;
+ }
+
+ public void setSchemaRegionConsensusPort(int schemaRegionConsensusPort) {
+ this.schemaRegionConsensusPort = schemaRegionConsensusPort;
}
public List<TEndPoint> getConfigNodeList() {
@@ -2644,12 +2688,20 @@ public class IoTDBConfig {
this.joinClusterTimeOutMs = joinClusterTimeOutMs;
}
- public String getConsensusProtocolClass() {
- return consensusProtocolClass;
+ public String getDataRegionConsensusProtocolClass() {
+ return dataRegionConsensusProtocolClass;
+ }
+
+ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+ this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
+ }
+
+ public String getSchemaRegionConsensusProtocolClass() {
+ return schemaRegionConsensusProtocolClass;
}
- public void setConsensusProtocolClass(String consensusProtocolClass) {
- this.consensusProtocolClass = consensusProtocolClass;
+ public void setSchemaRegionConsensusProtocolClass(String schemaRegionConsensusProtocolClass) {
+ this.schemaRegionConsensusProtocolClass = schemaRegionConsensusProtocolClass;
}
public String getSeriesPartitionExecutorClass() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 40b680bfed..517dcab53c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1555,9 +1555,17 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty("internal_port", Integer.toString(conf.getInternalPort()))));
- conf.setConsensusPort(
+ conf.setDataRegionConsensusPort(
Integer.parseInt(
- properties.getProperty("consensus_port", Integer.toString(conf.getConsensusPort()))));
+ properties.getProperty(
+ "data_region_consensus_port",
+ Integer.toString(conf.getDataRegionConsensusPort()))));
+
+ conf.setSchemaRegionConsensusPort(
+ Integer.parseInt(
+ properties.getProperty(
+ "schema_region_consensus_port",
+ Integer.toString(conf.getSchemaRegionConsensusPort()))));
}
public void loadShuffleProps(Properties properties) {
@@ -1608,7 +1616,8 @@ public class IoTDBDescriptor {
// These configurations are received from config node when registering
public void loadGlobalConfig(TGlobalConfig globalConfig) {
conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
- conf.setConsensusProtocolClass(globalConfig.getDataNodeConsensusProtocolClass());
+ conf.setDataRegionConsensusProtocolClass(globalConfig.getDataRegionConsensusProtocolClass());
+ conf.setSchemaRegionConsensusDir(globalConfig.getSchemaRegionConsensusProtocolClass());
conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
conf.setPartitionInterval(globalConfig.timePartitionInterval);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
copy to server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 96fd26af89..692e229f9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -21,15 +21,12 @@ package org.apache.iotdb.db.consensus;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
-import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import java.io.File;
@@ -37,41 +34,32 @@ import java.io.File;
* We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
* writing
*/
-public class ConsensusImpl {
+public class DataRegionConsensusImpl {
- private ConsensusImpl() {}
+ private DataRegionConsensusImpl() {}
public static IConsensus getInstance() {
- return ConsensusImplHolder.INSTANCE;
+ return DataRegionConsensusImplHolder.INSTANCE;
}
- private static class ConsensusImplHolder {
+ private static class DataRegionConsensusImplHolder {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
private static final IConsensus INSTANCE =
ConsensusFactory.getConsensusImpl(
- conf.getConsensusProtocolClass(),
- new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
- new File(conf.getConsensusDir()),
- gid -> {
- switch (gid.getType()) {
- case SchemaRegion:
- return new SchemaRegionStateMachine(
- SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
- case DataRegion:
- return new DataRegionStateMachine(
- StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
- }
- throw new IllegalArgumentException(
- String.format("Unexpected consensusGroup %s", gid));
- })
+ conf.getDataRegionConsensusProtocolClass(),
+ new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()),
+ new File(conf.getDataRegionConsensusDir()),
+ gid ->
+ new DataRegionStateMachine(
+ StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getConsensusProtocolClass())));
+ conf.getDataRegionConsensusProtocolClass())));
- private ConsensusImplHolder() {}
+ private DataRegionConsensusImplHolder() {}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 96fd26af89..87e99f98a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -20,58 +20,42 @@
package org.apache.iotdb.db.consensus;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
-import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import java.io.File;
-/**
- * We can use ConsensusImpl.getInstance() to obtain a consensus layer reference for reading and
- * writing
- */
-public class ConsensusImpl {
+public class SchemaRegionConsensusImpl {
- private ConsensusImpl() {}
+ private SchemaRegionConsensusImpl() {}
public static IConsensus getInstance() {
- return ConsensusImplHolder.INSTANCE;
+ return SchemaRegionConsensusImplHolder.INSTANCE;
}
- private static class ConsensusImplHolder {
+ private static class SchemaRegionConsensusImplHolder {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
private static final IConsensus INSTANCE =
ConsensusFactory.getConsensusImpl(
- conf.getConsensusProtocolClass(),
- new TEndPoint(conf.getInternalIp(), conf.getConsensusPort()),
- new File(conf.getConsensusDir()),
- gid -> {
- switch (gid.getType()) {
- case SchemaRegion:
- return new SchemaRegionStateMachine(
- SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid));
- case DataRegion:
- return new DataRegionStateMachine(
- StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid));
- }
- throw new IllegalArgumentException(
- String.format("Unexpected consensusGroup %s", gid));
- })
+ conf.getSchemaRegionConsensusProtocolClass(),
+ new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort()),
+ new File(conf.getSchemaRegionConsensusDir()),
+ gid ->
+ new SchemaRegionStateMachine(
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getConsensusProtocolClass())));
+ conf.getDataRegionConsensusProtocolClass())));
- private ConsensusImplHolder() {}
+ private SchemaRegionConsensusImplHolder() {}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 51616c30bd..0d22c6df90 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -19,13 +19,11 @@
package org.apache.iotdb.db.consensus.statemachine;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,18 +32,6 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
- @Override
- public TSStatus write(IConsensusRequest request) {
- try {
- return write(getFragmentInstance(request));
- } catch (IllegalArgumentException e) {
- logger.error(e.getMessage(), e);
- return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- }
- }
-
- protected abstract TSStatus write(FragmentInstance fragmentInstance);
-
@Override
public DataSet read(IConsensusRequest request) {
try {
@@ -58,7 +44,7 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
protected abstract DataSet read(FragmentInstance fragmentInstance);
- private FragmentInstance getFragmentInstance(IConsensusRequest request) {
+ protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
FragmentInstance instance;
if (request instanceof ByteBufferConsensusRequest) {
instance =
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7fe44559f7..f0b03a451f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.BatchProcessException;
@@ -31,11 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +75,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
public void loadSnapshot(File latestSnapshotRootDir) {}
@Override
- protected TSStatus write(FragmentInstance fragmentInstance) {
- PlanNode planNode = fragmentInstance.getFragment().getRoot();
+ public TSStatus write(IConsensusRequest request) {
+ FragmentInstance fi;
+ long minSyncIndex = Long.MAX_VALUE;
+ long currentIndex = -1;
+ try {
+ if (request instanceof IndexedConsensusRequest) {
+ fi = getFragmentInstance(((IndexedConsensusRequest) request).getRequest());
+ minSyncIndex = ((IndexedConsensusRequest) request).getMinSyncIndex();
+ currentIndex = ((IndexedConsensusRequest) request).getCurrentIndex();
+ } else {
+ fi = getFragmentInstance(request);
+ }
+ PlanNode planNode = fi.getFragment().getRoot();
+ if (planNode instanceof InsertNode) {
+ logger.error("minIndex", minSyncIndex);
+ logger.error("currentIndex", currentIndex);
+ }
+ return write(planNode);
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(), e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ protected TSStatus write(PlanNode planNode) {
try {
if (planNode instanceof InsertRowNode) {
region.insert((InsertRowNode) planNode);
@@ -95,7 +122,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
} catch (BatchProcessException e) {
return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
} catch (Exception e) {
- logger.error("Error in executing plan node: {}", planNode);
+ logger.error("Error in executing plan node: {}", planNode, e);
return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
return StatusUtils.OK;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 888346907a..dfd6f1ef5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,11 +61,15 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
public void loadSnapshot(File latestSnapshotRootDir) {}
@Override
- protected TSStatus write(FragmentInstance fragmentInstance) {
+ public TSStatus write(IConsensusRequest request) {
logger.info("Execute write plan in SchemaRegionStateMachine");
- PlanNode planNode = fragmentInstance.getFragment().getRoot();
- TSStatus status = planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
- return status;
+ try {
+ PlanNode planNode = getFragmentInstance(request).getFragment().getRoot();
+ return planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(), e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8922de59e3..35637f8140 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
@@ -167,8 +169,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
instance.getRegionReplicaSet().getRegionId());
switch (instance.getType()) {
case READ:
- FragmentInstanceInfo info =
- (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset();
+ FragmentInstanceInfo info;
+ if (groupId instanceof SchemaRegionId) {
+ info =
+ (FragmentInstanceInfo)
+ SchemaRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+ } else {
+ info =
+ (FragmentInstanceInfo)
+ DataRegionConsensusImpl.getInstance().read(groupId, instance).getDataset();
+ }
return !info.getState().isFailed();
case WRITE:
PlanNode planNode = instance.getFragment().getRoot();
@@ -179,7 +189,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
throw new FragmentInstanceDispatchException(e);
}
}
- ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance);
+ ConsensusWriteResponse resp;
+ if (groupId instanceof SchemaRegionId) {
+ resp = SchemaRegionConsensusImpl.getInstance().write(groupId, instance);
+ } else {
+ resp = DataRegionConsensusImpl.getInstance().write(groupId, instance);
+ }
return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode();
}
throw new UnsupportedOperationException(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 8875150162..37079c0355 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,7 +35,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.IoTDBStartCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -150,8 +151,10 @@ public class DataNode implements DataNodeMBean {
new TEndPoint(config.getInternalIp(), config.getInternalPort()));
location.setDataBlockManagerEndPoint(
new TEndPoint(config.getInternalIp(), config.getDataBlockManagerPort()));
- location.setConsensusEndPoint(
- new TEndPoint(config.getInternalIp(), config.getConsensusPort()));
+ location.setDataRegionConsensusEndPoint(
+ new TEndPoint(config.getInternalIp(), config.getDataRegionConsensusPort()));
+ location.setSchemaRegionConsensusEndPoint(
+ new TEndPoint(config.getInternalIp(), config.getSchemaRegionConsensusPort()));
req.setDataNodeLocation(location);
TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
@@ -228,7 +231,8 @@ public class DataNode implements DataNodeMBean {
try {
// TODO: Start consensus layer in some where else
- ConsensusImpl.getInstance().start();
+ DataRegionConsensusImpl.getInstance().start();
+ SchemaRegionConsensusImpl.getInstance().start();
} catch (IOException e) {
throw new StartupException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 1a4661d5b4..1144cb807f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -32,13 +32,13 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -91,7 +91,6 @@ public class InternalServiceImpl implements InternalService.Iface {
private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
- private final IConsensus consensusImpl = ConsensusImpl.getInstance();
public InternalServiceImpl() {
super();
@@ -105,9 +104,16 @@ public class InternalServiceImpl implements InternalService.Iface {
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
switch (type) {
case READ:
- ConsensusReadResponse readResp =
- ConsensusImpl.getInstance()
- .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+ ConsensusReadResponse readResp;
+ if (groupId instanceof SchemaRegionId) {
+ readResp =
+ SchemaRegionConsensusImpl.getInstance()
+ .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+ } else {
+ readResp =
+ DataRegionConsensusImpl.getInstance()
+ .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+ }
FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
return new TSendFragmentInstanceResp(!info.getState().isFailed());
case WRITE:
@@ -126,7 +132,11 @@ public class InternalServiceImpl implements InternalService.Iface {
return response;
}
}
- resp = ConsensusImpl.getInstance().write(groupId, fragmentInstance);
+ if (groupId instanceof SchemaRegionId) {
+ resp = SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+ } else {
+ resp = DataRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
+ }
// TODO need consider more status
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
@@ -183,12 +193,12 @@ public class InternalServiceImpl implements InternalService.Iface {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
TEndPoint endpoint =
new TEndPoint(
- dataNodeLocation.getConsensusEndPoint().getIp(),
- dataNodeLocation.getConsensusEndPoint().getPort());
+ dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
+ dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
peers.add(new Peer(schemaRegionId, endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
- consensusImpl.addConsensusGroup(schemaRegionId, peers);
+ SchemaRegionConsensusImpl.getInstance().addConsensusGroup(schemaRegionId, peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
@@ -221,12 +231,12 @@ public class InternalServiceImpl implements InternalService.Iface {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
TEndPoint endpoint =
new TEndPoint(
- dataNodeLocation.getConsensusEndPoint().getIp(),
- dataNodeLocation.getConsensusEndPoint().getPort());
+ dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
+ dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
peers.add(new Peer(dataRegionId, endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
- consensusImpl.addConsensusGroup(dataRegionId, peers);
+ DataRegionConsensusImpl.getInstance().addConsensusGroup(dataRegionId, peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
@@ -285,7 +295,15 @@ public class InternalServiceImpl implements InternalService.Iface {
PlanFragment planFragment = new PlanFragment(planFragmentId, deleteRegionNode);
FragmentInstance fragmentInstance =
new FragmentInstance(planFragment, fragmentInstanceId, null, QueryType.WRITE);
- return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
+ if (consensusGroupId instanceof SchemaRegionId) {
+ return SchemaRegionConsensusImpl.getInstance()
+ .write(consensusGroupId, fragmentInstance)
+ .getStatus();
+ } else {
+ return DataRegionConsensusImpl.getInstance()
+ .write(consensusGroupId, fragmentInstance)
+ .getStatus();
+ }
}
public void handleClientExit() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 53950b092a..7385b9cbd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -57,7 +57,8 @@ public class FragmentInstanceSerdeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
FragmentInstance fragmentInstance =
@@ -86,7 +87,8 @@ public class FragmentInstanceSerdeTest {
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
- dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
FragmentInstance fragmentInstance =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3fafea7cfd..c1a00026af 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -31,7 +31,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -78,23 +79,24 @@ public class InternalServiceImplTest {
IoTDB.configManager.init();
configNode = LocalConfigNode.getInstance();
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
- ConsensusImpl.getInstance().start();
+ DataRegionConsensusImpl.getInstance().start();
+ SchemaRegionConsensusImpl.getInstance().start();
}
@Before
public void setUp() throws Exception {
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
- ConsensusImpl.getInstance()
+ SchemaRegionConsensusImpl.getInstance()
.addConsensusGroup(
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
- genPeerList(regionReplicaSet));
+ genSchemaRegionPeerList(regionReplicaSet));
internalServiceImpl = new InternalServiceImpl();
}
@After
public void tearDown() throws Exception {
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
- ConsensusImpl.getInstance()
+ SchemaRegionConsensusImpl.getInstance()
.removeConsensusGroup(
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
FileUtils.deleteFully(new File(conf.getConsensusDir()));
@@ -102,7 +104,8 @@ public class InternalServiceImplTest {
@AfterClass
public static void tearDownAfterClass() throws IOException, StorageEngineException {
- ConsensusImpl.getInstance().stop();
+ DataRegionConsensusImpl.getInstance().stop();
+ SchemaRegionConsensusImpl.getInstance().stop();
IoTDB.configManager.clear();
EnvironmentUtils.cleanEnv();
}
@@ -360,20 +363,23 @@ public class InternalServiceImplTest {
.setInternalEndPoint(new TEndPoint(conf.getInternalIp(), conf.getInternalPort()))
.setDataBlockManagerEndPoint(
new TEndPoint(conf.getInternalIp(), conf.getDataBlockManagerPort()))
- .setConsensusEndPoint(new TEndPoint(conf.getInternalIp(), conf.getConsensusPort())));
+ .setDataRegionConsensusEndPoint(
+ new TEndPoint(conf.getInternalIp(), conf.getDataRegionConsensusPort()))
+ .setSchemaRegionConsensusEndPoint(
+ new TEndPoint(conf.getInternalIp(), conf.getSchemaRegionConsensusPort())));
// construct fragmentInstance
return new TRegionReplicaSet(
new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0), dataNodeList);
}
- private List<Peer> genPeerList(TRegionReplicaSet regionReplicaSet) {
+ private List<Peer> genSchemaRegionPeerList(TRegionReplicaSet regionReplicaSet) {
List<Peer> peerList = new ArrayList<>();
for (TDataNodeLocation node : regionReplicaSet.getDataNodeLocations()) {
peerList.add(
new Peer(
new SchemaRegionId(regionReplicaSet.getRegionId().getId()),
- node.getConsensusEndPoint()));
+ node.getSchemaRegionConsensusEndPoint()));
}
return peerList;
}
diff --git a/server/src/test/resources/datanode1conf/iotdb-engine.properties b/server/src/test/resources/datanode1conf/iotdb-engine.properties
index 0f02933bbc..cced42b412 100644
--- a/server/src/test/resources/datanode1conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
rpc_port=6667
data_block_manager_port=8777
internal_port=9003
-consensus_port=40010
+data_region_consensus_port=40010
+schema_region_consensus_port=50010
config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
diff --git a/server/src/test/resources/datanode2conf/iotdb-engine.properties b/server/src/test/resources/datanode2conf/iotdb-engine.properties
index 429c025a6b..c570d996b7 100644
--- a/server/src/test/resources/datanode2conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
rpc_port=6669
data_block_manager_port=8779
internal_port=9005
-consensus_port=40012
+data_region_consensus_port=40012
+schema_region_consensus_port=50012
config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
diff --git a/server/src/test/resources/datanode3conf/iotdb-engine.properties b/server/src/test/resources/datanode3conf/iotdb-engine.properties
index 5a390729a6..956faee018 100644
--- a/server/src/test/resources/datanode3conf/iotdb-engine.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-engine.properties
@@ -23,7 +23,8 @@ internal_ip=127.0.0.1
rpc_port=6671
data_block_manager_port=8781
internal_port=9007
-consensus_port=40014
+data_region_consensus_port=40014
+schema_region_consensus_port=50014
config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 8ffdf3ff40..cfd5fc4157 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -74,8 +74,10 @@ struct TDataNodeLocation {
3: required TEndPoint internalEndPoint
// TEndPoint for transfering data between DataNodes
4: required TEndPoint dataBlockManagerEndPoint
- // TEndPoint for DataNode's ConsensusLayer
- 5: required TEndPoint consensusEndPoint
+ // TEndPoint for DataNode's dataRegion consensus protocol
+ 5: required TEndPoint dataRegionConsensusEndPoint
+ // TEndPoint for DataNode's schemaRegion consensus protocol
+ 6: required TEndPoint schemaRegionConsensusEndPoint
}
struct THeartbeatResp {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 7fa76108e8..6d817f29ce 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -30,10 +30,11 @@ struct TDataNodeRegisterReq {
}
struct TGlobalConfig {
- 1: required string dataNodeConsensusProtocolClass
- 2: required i32 seriesPartitionSlotNum
- 3: required string seriesPartitionExecutorClass
- 4: required i64 timePartitionInterval
+ 1: required string dataRegionConsensusProtocolClass
+ 2: required string schemaRegionConsensusProtocolClass
+ 3: required i32 seriesPartitionSlotNum
+ 4: required string seriesPartitionExecutorClass
+ 5: required i64 timePartitionInterval
}
struct TDataNodeRegisterResp {
@@ -159,13 +160,14 @@ struct TCheckUserPrivilegesReq{
// ConfigNode
struct TConfigNodeRegisterReq {
1: required common.TConfigNodeLocation configNodeLocation
- 2: required string dataNodeConsensusProtocolClass
- 3: required i32 seriesPartitionSlotNum
- 4: required string seriesPartitionExecutorClass
- 5: required i64 defaultTTL
- 6: required i64 timePartitionInterval
- 7: required i32 schemaReplicationFactor
- 8: required i32 dataReplicationFactor
+ 2: required string dataRegionConsensusProtocolClass
+ 3: required string schemaRegionConsensusProtocolClass
+ 4: required i32 seriesPartitionSlotNum
+ 5: required string seriesPartitionExecutorClass
+ 6: required i64 defaultTTL
+ 7: required i64 timePartitionInterval
+ 8: required i32 schemaReplicationFactor
+ 9: required i32 dataReplicationFactor
}
struct TConfigNodeRegisterResp {
diff --git a/thrift-multi-leader-consensus/pom.xml b/thrift-multi-leader-consensus/pom.xml
new file mode 100644
index 0000000000..7504f96722
--- /dev/null
+++ b/thrift-multi-leader-consensus/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-parent</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>thrift-multi-leader-consensus</artifactId>
+ <name>rpc-thrift-multi-leader-consensus</name>
+ <description>Rpc modules for multi leader consensus algorithm</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/thrift</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
new file mode 100644
index 0000000000..10c0195506
--- /dev/null
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+include "common.thrift"
+namespace java org.apache.iotdb.consensus.multileader.thrift
+
+struct TSyncLogReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required list<binary> batches
+}
+
+struct TSyncLogRes {
+ 1: required list<common.TSStatus> status
+}
+
+service MultiLeaderConsensusIService {
+ TSyncLogRes syncLog(TSyncLogReq req)
+}
\ No newline at end of file