You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/27 05:07:50 UTC
[iotdb] branch master updated: [IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4f4b360 [IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
4f4b360 is described below
commit 4f4b360724eb50871f7b856ddb9f15adb13831cf
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Mar 27 13:06:51 2022 +0800
[IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
---
confignode/pom.xml | 5 +
.../resources/conf/iotdb-confignode.properties | 44 +++++--
confignode/src/assembly/resources/conf/logback.xml | 4 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 38 +++++-
.../iotdb/confignode/conf/ConfigNodeConstant.java | 1 +
.../confignode/conf/ConfigNodeDescriptor.java | 42 +++++--
.../iotdb/confignode/consensus/ConsensusType.java | 54 +++++++++
.../statemachine/PartitionRegionStateMachine.java | 15 ++-
.../iotdb/confignode/manager/ConfigManager.java | 109 ++++++++++++-----
.../iotdb/confignode/partition/PartitionTable.java | 11 +-
.../iotdb/confignode/physical/PhysicalPlan.java | 9 ++
.../iotdb/confignode/service/ConfigNode.java | 22 +---
.../confignode/service/ConfigNodeCommandLine.java | 6 +-
.../confignode/service/executor/PlanExecutor.java | 4 +-
.../service/thrift/server/ConfigNodeRPCServer.java | 21 ++--
.../server/ConfigNodeRPCServerProcessor.java | 7 +-
.../thrift/server/ConfigNodeRPCServiceHandler.java | 2 +-
.../manager/ConfigManagerManualTest.java | 132 +++++++++++++++++++++
...java => DeviceGroupHashExecutorManualTest.java} | 8 +-
.../server/ConfigNodeRPCServerProcessorTest.java | 5 +-
.../iotdb/consensus/ratis/RequestMessage.java | 23 +++-
.../iotdb/commons/service/RegisterManager.java | 2 +-
22 files changed, 455 insertions(+), 109 deletions(-)
diff --git a/confignode/pom.xml b/confignode/pom.xml
index 68b3614..874e682 100644
--- a/confignode/pom.xml
+++ b/confignode/pom.xml
@@ -56,6 +56,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.2.5</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 103cc1b..801bf17 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -33,11 +33,7 @@ config_node_rpc_port=22277
# Datatype: int
# config_node_internal_port=22278
-# used for building the ConfigNode consensus group
-# all config node address and internal port
-# every node should have the same config_node_address_lists
-# Datatype: String
-# config_node_address_lists=host0:22278,host1:22278,host2:22278
+# this feature is under development, set this as false before it is done.
# Datatype: boolean
# rpc_thrift_compression_enable=false
@@ -58,6 +54,23 @@ config_node_rpc_port=22277
# thrift_init_buffer_size=1024
####################
+### consensus protocol configuration
+####################
+
+# ConfigNodeGroup consensus protocol type
+# These consensus protocol are currently supported:
+# 1. standalone(No protocol, only supports stand-alone machine)
+# 2. ratis(Raft protocol)
+# Datatype: String
+# consensus_type=standalone
+
+# Used for building the ConfigNode consensus group
+# all config node address and internal port, use comma to distinguish
+# every node should have the same config_node_address_lists
+# Datatype: String
+# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+
+####################
### DeviceGroup Configuration
####################
@@ -81,9 +94,9 @@ config_node_rpc_port=22277
####################
# system dir
-# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/system).
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/system).
# If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
# For windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
# system_dir=data\\system
@@ -93,9 +106,9 @@ config_node_rpc_port=22277
# data dirs
-# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data).
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/data).
# If it is absolute, system will save the data in exact location it points to.
-# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
# For windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
@@ -104,6 +117,19 @@ config_node_rpc_port=22277
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# data_dirs=data/data
+
+# consensus dir
+# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/consensus).
+# If it is absolute, system will save the data in exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
+# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data\\consensus
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# consensus_dir=data/consensus
+
####################
### Region Configuration
####################
diff --git a/confignode/src/assembly/resources/conf/logback.xml b/confignode/src/assembly/resources/conf/logback.xml
index cfef99e..4e8e0d6 100644
--- a/confignode/src/assembly/resources/conf/logback.xml
+++ b/confignode/src/assembly/resources/conf/logback.xml
@@ -135,6 +135,6 @@
<appender-ref ref="FILEALL"/>
<appender-ref ref="stdout"/>
</root>
- <logger level="info" name="org.apache.iotdb.confignode.service"/>
- <logger level="info" name="org.apache.iotdb.confignode.conf"/>
+ <logger level="info" name="org.apache.iotdb.confignode"/>
+ <!-- <logger level="info" name="org.apache.ratis"/> -->
</configuration>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 128a54d..be19e56 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.consensus.ConsensusType;
+import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
@@ -34,8 +36,11 @@ public class ConfigNodeConf {
/** used for communication between data node and data node */
private int internalPort = 22278;
- /** every node should have the same config_node_address_lists */
- private String addressLists;
+ /** ConfigNodeGroup consensus protocol */
+ private ConsensusType consensusType = ConsensusType.STANDALONE;
+
+ /** Used for building the ConfigNode consensus group */
+ private Endpoint[] configNodeGroupAddressList = null;
/** Number of DeviceGroups per StorageGroup */
private int deviceGroupCount = 10000;
@@ -70,6 +75,10 @@ public class ConfigNodeConf {
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.DATA_DIR
};
+ /** Consensus directory, storage consensus protocol logs */
+ private String consensusDir =
+ ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
+
private int regionReplicaCount = 3;
private int schemaRegionCount = 1;
private int dataRegionCount = 1;
@@ -87,6 +96,7 @@ public class ConfigNodeConf {
for (int i = 0; i < dataDirs.length; i++) {
dataDirs[i] = addHomeDir(dataDirs[i]);
}
+ consensusDir = addHomeDir(consensusDir);
}
private String addHomeDir(String dir) {
@@ -181,12 +191,28 @@ public class ConfigNodeConf {
this.internalPort = internalPort;
}
- public String getAddressLists() {
- return addressLists;
+ public String getConsensusDir() {
+ return consensusDir;
+ }
+
+ public void setConsensusDir(String consensusDir) {
+ this.consensusDir = consensusDir;
+ }
+
+ public ConsensusType getConsensusType() {
+ return consensusType;
+ }
+
+ public void setConsensusType(ConsensusType consensusType) {
+ this.consensusType = consensusType;
+ }
+
+ public Endpoint[] getConfigNodeGroupAddressList() {
+ return configNodeGroupAddressList;
}
- public void setAddressLists(String addressLists) {
- this.addressLists = addressLists;
+ public void setConfigNodeGroupAddressList(Endpoint[] configNodeGroupAddressList) {
+ this.configNodeGroupAddressList = configNodeGroupAddressList;
}
public int getThriftServerAwaitTimeForStopService() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 5629d8d..f3bc4ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -41,6 +41,7 @@ public class ConfigNodeConstant {
public static final String DATA_DIR = "data";
public static final String CONF_DIR = "conf";
+ public static final String CONSENSUS_FOLDER = "consensus";
public static final int MIN_SUPPORTED_JDK_VERSION = 8;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index d71faee..acfce96 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.confignode.conf;
+import org.apache.iotdb.confignode.consensus.ConsensusType;
+import org.apache.iotdb.consensus.common.Endpoint;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +52,7 @@ public class ConfigNodeDescriptor {
public URL getPropsUrl() {
// Check if a config-directory was specified first.
String urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_CONF, null);
- // If it wasn't, check if a home directory was provided (This usually contains a config)
+ // If it wasn't, check if a home directory was provided
if (urlString == null) {
urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_HOME, null);
if (urlString != null) {
@@ -60,15 +63,9 @@ public class ConfigNodeDescriptor {
+ File.separatorChar
+ ConfigNodeConstant.CONF_NAME;
} else {
- // If this too wasn't provided, try to find a default config in the root of the classpath.
- URL uri = ConfigNodeConf.class.getResource("/" + ConfigNodeConstant.CONF_NAME);
- if (uri != null) {
- return uri;
- }
- LOGGER.warn(
- "Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading "
- + "config file {}, use default configuration",
- ConfigNodeConstant.CONF_NAME);
+ // When start ConfigNode with the script, the environment variables CONFIGNODE_CONF
+ // and CONFIGNODE_HOME will be set. But we didn't set these two in developer mode.
+ // Thus, just return null and use default Configuration in developer mode.
return null;
}
}
@@ -125,8 +122,9 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"config_node_internal_port", String.valueOf(conf.getInternalPort()))));
- conf.setAddressLists(
- properties.getProperty("config_node_address_lists", conf.getAddressLists()));
+ conf.setConsensusType(
+ ConsensusType.getConsensusType(
+ properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
conf.setRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
@@ -160,6 +158,8 @@ public class ConfigNodeDescriptor {
conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
+ conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+
conf.setRegionReplicaCount(
Integer.parseInt(
properties.getProperty(
@@ -175,6 +175,24 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"data_region_count", String.valueOf(conf.getDataRegionCount()))));
+ String addresses = properties.getProperty("config_node_group_address_list", null);
+ if (addresses != null) {
+ String[] addressList = addresses.split(",");
+ Endpoint[] endpointList = new Endpoint[addressList.length];
+ for (int i = 0; i < addressList.length; i++) {
+ String[] ipPort = addressList[i].split(":");
+ if (ipPort.length != 2) {
+ throw new IOException(
+ String.format(
+ "Parsing parameter config_node_group_address_list error. "
+ + "The %d-th address must format to ip:port, but currently is %s",
+ i, addressList[i]));
+ }
+ endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
+ }
+ conf.setConfigNodeGroupAddressList(endpointList);
+ }
+
} catch (IOException e) {
LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
} finally {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java
new file mode 100644
index 0000000..8d242cc
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/ConsensusType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus;
+
+import java.util.Arrays;
+
+public enum ConsensusType {
+ STANDALONE("standalone"),
+ RATIS("ratis");
+
+ private final String typeName;
+
+ ConsensusType(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ @Override
+ public String toString() {
+ return typeName;
+ }
+
+ public static ConsensusType getConsensusType(String typeName) {
+ for (ConsensusType type : ConsensusType.values()) {
+ if (type.getTypeName().equals(typeName)) {
+ return type;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown consensus type, found: %s expected: %s",
+ typeName, Arrays.toString(ConsensusType.values())));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index c161061..2c63b40 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -38,7 +38,11 @@ public class PartitionRegionStateMachine implements IStateMachine {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
- private final PlanExecutor executor = new PlanExecutor();
+ private final PlanExecutor executor;
+
+ public PartitionRegionStateMachine() {
+ this.executor = new PlanExecutor();
+ }
@Override
public TSStatus write(IConsensusRequest request) {
@@ -74,7 +78,14 @@ public class PartitionRegionStateMachine implements IStateMachine {
@Override
public DataSet read(IConsensusRequest request) {
PhysicalPlan plan;
- if (request instanceof PhysicalPlan) {
+ if (request instanceof ByteBufferConsensusRequest) {
+ try {
+ plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+ } catch (IOException e) {
+ LOGGER.error("Deserialization error for write plan : {}", request);
+ return null;
+ }
+ } else if (request instanceof PhysicalPlan) {
plan = (PhysicalPlan) request;
} else {
LOGGER.error("Unexpected read plan : {}", request);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 7c9a463..1fd5767 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
@@ -31,15 +30,20 @@ import org.apache.iotdb.consensus.common.GroupType;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.ratis.RatisConsensus;
import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
/**
* ConfigManager maintains consistency between PartitionTables in the ConfigNodeGroup. Expose the
@@ -48,36 +52,34 @@ import java.util.Collections;
public class ConfigManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
+ private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
private IConsensus consensusImpl;
private ConsensusGroupId consensusGroupId;
private DeviceGroupHashExecutor hashExecutor;
- @TestOnly
- public ConfigManager(String hashExecutorClass, int deviceGroupCount) {
- setHashExecutor(hashExecutorClass, deviceGroupCount);
- }
-
- public ConfigManager() {
- ConfigNodeConf config = ConfigNodeDescriptor.getInstance().getConf();
-
- setHashExecutor(config.getDeviceGroupHashExecutorClass(), config.getDeviceGroupCount());
- setConsensusLayer(config);
+ public ConfigManager() throws IOException {
+ setHashExecutor();
+ setConsensusLayer();
}
/** Build DeviceGroupHashExecutor */
- private void setHashExecutor(String hashExecutorClass, int deviceGroupCount) {
+ private void setHashExecutor() {
try {
- Class<?> executor = Class.forName(hashExecutorClass);
+ Class<?> executor = Class.forName(conf.getDeviceGroupHashExecutorClass());
Constructor<?> executorConstructor = executor.getConstructor(int.class);
- hashExecutor = (DeviceGroupHashExecutor) executorConstructor.newInstance(deviceGroupCount);
+ hashExecutor =
+ (DeviceGroupHashExecutor) executorConstructor.newInstance(conf.getDeviceGroupCount());
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
- LOGGER.error("Couldn't Constructor DeviceGroupHashExecutor class: {}", hashExecutorClass, e);
+ LOGGER.error(
+ "Couldn't Constructor DeviceGroupHashExecutor class: {}",
+ conf.getDeviceGroupHashExecutorClass(),
+ e);
hashExecutor = null;
}
}
@@ -87,33 +89,80 @@ public class ConfigManager {
}
/** Build ConfigNodeGroup ConsensusLayer */
- private void setConsensusLayer(ConfigNodeConf config) {
- // TODO: Support other consensus protocol
- this.consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
- // TODO: handle the possible exception that cause the Consensus fail
- try {
- this.consensusImpl.start();
- } catch (IOException e) {
- e.printStackTrace();
+ private void setConsensusLayer() throws IOException {
+ // There is only one ConfigNodeGroup
+ consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
+
+ // If consensusDir does not exist, create consensusDir
+ File consensusDir = new File(conf.getConsensusDir());
+ if (!consensusDir.exists()) {
+ if (consensusDir.mkdirs()) {
+ LOGGER.info("Make consensus dirs: {}", consensusDir);
+ } else {
+ throw new IOException(
+ String.format(
+ "Start ConfigNode failed, because couldn't make system dirs: %s.",
+ consensusDir.getAbsolutePath()));
+ }
}
- this.consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
- this.consensusImpl.addConsensusGroup(
- this.consensusGroupId,
+ // Implement specific consensus
+ switch (conf.getConsensusType()) {
+ case STANDALONE:
+ constructStandAloneConsensus();
+ break;
+ case RATIS:
+ constructRatisConsensus();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Start ConfigNode failed, unrecognized ConsensusType: "
+ + conf.getConsensusType().getTypeName());
+ }
+ }
+
+ private void constructStandAloneConsensus() throws IOException {
+ // Standalone consensus
+ consensusImpl = new StandAloneConsensus(id -> new PartitionRegionStateMachine());
+ consensusImpl.start();
+
+ // Standalone ConsensusGroup
+ consensusImpl.addConsensusGroup(
+ consensusGroupId,
Collections.singletonList(
new Peer(
- this.consensusGroupId,
- new Endpoint(config.getRpcAddress(), config.getInternalPort()))));
+ consensusGroupId, new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))));
+ }
+
+ private void constructRatisConsensus() throws IOException {
+ // Ratis consensus local implement
+ consensusImpl =
+ RatisConsensus.newBuilder()
+ .setEndpoint(new Endpoint(conf.getRpcAddress(), conf.getInternalPort()))
+ .setStateMachineRegistry(id -> new PartitionRegionStateMachine())
+ .setStorageDir(new File(conf.getConsensusDir()))
+ .build();
+ consensusImpl.start();
+
+ // Build ratis group from user properties
+ LOGGER.info(
+ "Set ConfigNode consensus group {}...",
+ Arrays.toString(conf.getConfigNodeGroupAddressList()));
+ List<Peer> peerList = new ArrayList<>();
+ for (Endpoint endpoint : conf.getConfigNodeGroupAddressList()) {
+ peerList.add(new Peer(consensusGroupId, endpoint));
+ }
+ consensusImpl.addConsensusGroup(consensusGroupId, peerList);
}
/** Transmit PhysicalPlan to confignode.consensus.statemachine */
public ConsensusWriteResponse write(PhysicalPlan plan) {
- return this.consensusImpl.write(this.consensusGroupId, plan);
+ return consensusImpl.write(consensusGroupId, plan);
}
/** Transmit PhysicalPlan to confignode.consensus.statemachine */
public ConsensusReadResponse read(PhysicalPlan plan) {
- return this.consensusImpl.read(this.consensusGroupId, plan);
+ return consensusImpl.read(consensusGroupId, plan);
}
// TODO: Interfaces for LoadBalancer control
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
index ea11a7e..9f0354d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.partition;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
@@ -40,12 +41,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class PartitionTable {
- private static final int regionReplicaCount =
- ConfigNodeDescriptor.getInstance().getConf().getRegionReplicaCount();
- private static final int schemaRegionCount =
- ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionCount();
- private static final int dataRegionCount =
- ConfigNodeDescriptor.getInstance().getConf().getDataRegionCount();
+ private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+ private static final int regionReplicaCount = conf.getRegionReplicaCount();
+ private static final int schemaRegionCount = conf.getSchemaRegionCount();
+ private static final int dataRegionCount = conf.getDataRegionCount();
private final ReentrantReadWriteLock lock;
// TODO: Serialize and Deserialize
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 523385a..858cc9d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.confignode.physical;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.slf4j.Logger;
@@ -64,6 +66,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
buffer.reset();
throw e;
}
+ buffer.flip();
}
protected abstract void serializeImpl(ByteBuffer buffer);
@@ -83,6 +86,12 @@ public abstract class PhysicalPlan implements IConsensusRequest {
case QueryDataNodeInfo:
plan = new QueryDataNodeInfoPlan();
break;
+ case SetStorageGroup:
+ plan = new SetStorageGroupPlan();
+ break;
+ case QueryStorageGroupSchema:
+ plan = new QueryStorageGroupSchemaPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 6069f70..418ff9a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
-import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServer;
@@ -31,6 +30,8 @@ import org.apache.iotdb.confignode.service.thrift.server.ConfigNodeRPCServerProc
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
public class ConfigNode implements ConfigNodeMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
@@ -39,7 +40,7 @@ public class ConfigNode implements ConfigNodeMBean {
"%s:%s=%s",
ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
- private static final RegisterManager registerManager = new RegisterManager();
+ private final RegisterManager registerManager = new RegisterManager();
private ConfigNode() {
// empty constructor
@@ -50,31 +51,20 @@ public class ConfigNode implements ConfigNodeMBean {
}
/** Register services */
- private void setUp() throws StartupException {
+ private void setUp() throws StartupException, IOException {
LOGGER.info("Setting up {}...", ConfigNodeConstant.GLOBAL_NAME);
registerManager.register(JMXService.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
- ConfigNodeRPCServerProcessor configNodeRPCServerProcessor = new ConfigNodeRPCServerProcessor();
- ConfigNodeRPCServer.getInstance().initSyncedServiceImpl(configNodeRPCServerProcessor);
+ ConfigNodeRPCServer.getInstance().initSyncedServiceImpl(new ConfigNodeRPCServerProcessor());
registerManager.register(ConfigNodeRPCServer.getInstance());
LOGGER.info("Init rpc server success");
}
public void active() {
- StartupChecks checks = new StartupChecks().withDefaultTest();
- try {
- // Startup environment check
- checks.verify();
- } catch (StartupException e) {
- LOGGER.error(
- "{}: failed to start because some checks failed. ", ConfigNodeConstant.GLOBAL_NAME, e);
- return;
- }
-
try {
setUp();
- } catch (StartupException e) {
+ } catch (StartupException | IOException e) {
LOGGER.error("Meet error while starting up.", e);
deactivate();
return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 27d1325..52894d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.commons.ServerCommandLine;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.confignode.conf.ConfigNodeConfCheck;
import org.slf4j.Logger;
@@ -66,7 +67,10 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
LOGGER.info("Running mode {}", mode);
if (MODE_START.equals(mode)) {
try {
- // Check parameters
+ // Startup environment check
+ StartupChecks checks = new StartupChecks().withDefaultTest();
+ checks.verify();
+ // Check special parameters
ConfigNodeConfCheck.getInstance().checkConfig();
} catch (IOException | ConfigurationException | StartupException e) {
LOGGER.error("Meet error when doing start checking", e);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index ccb57e3..c9c90af 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -29,10 +29,10 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class PlanExecutor {
- private final PartitionTable partitionTable = new PartitionTable();
+ private final PartitionTable partitionTable;
public PlanExecutor() {
- // empty constructor
+ this.partitionTable = new PartitionTable();
}
public DataSet executorQueryPlan(PhysicalPlan plan) throws UnknownPhysicalPlanTypeException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
index 876f059..2c3542b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServer.java
@@ -30,11 +30,14 @@ import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCServerMBean {
- private final ConfigNodeConf config = ConfigNodeDescriptor.getInstance().getConf();
+
+ private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
private ConfigNodeRPCServerProcessor configNodeRPCServerProcessor;
- private ConfigNodeRPCServer() {}
+ private ConfigNodeRPCServer() {
+ // empty constructor
+ }
@Override
public ThriftService getImplementation() {
@@ -72,10 +75,10 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
ThreadName.CONFIG_NODE_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
- config.getRpcMaxConcurrentClientNum(),
- config.getThriftServerAwaitTimeForStopService(),
+ conf.getRpcMaxConcurrentClientNum(),
+ conf.getThriftServerAwaitTimeForStopService(),
new ConfigNodeRPCServiceHandler(configNodeRPCServerProcessor),
- config.isRpcThriftCompressionEnabled());
+ conf.isRpcThriftCompressionEnabled());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
@@ -84,12 +87,12 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
@Override
public String getBindIP() {
- return config.getRpcAddress();
+ return conf.getRpcAddress();
}
@Override
public int getBindPort() {
- return config.getRpcPort();
+ return conf.getRpcPort();
}
public static ConfigNodeRPCServer getInstance() {
@@ -100,6 +103,8 @@ public class ConfigNodeRPCServer extends ThriftService implements ConfigNodeRPCS
private static final ConfigNodeRPCServer INSTANCE = new ConfigNodeRPCServer();
- private ConfigNodeRPCServerHolder() {}
+ private ConfigNodeRPCServerHolder() {
+ // empty constructor
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 2427a0a..9cd98e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -48,16 +48,17 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
- private final ConfigManager configManager = new ConfigManager();
+ private final ConfigManager configManager;
- public ConfigNodeRPCServerProcessor() {
- // empty constructor
+ public ConfigNodeRPCServerProcessor() throws IOException {
+ this.configManager = new ConfigManager();
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
index a186a12..6388850 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
@@ -22,7 +22,7 @@ import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
public class ConfigNodeRPCServiceHandler implements TServerEventHandler {
- private ConfigNodeRPCServerProcessor processor;
+ private final ConfigNodeRPCServerProcessor processor;
public ConfigNodeRPCServiceHandler(ConfigNodeRPCServerProcessor processor) {
this.processor = processor;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
new file mode 100644
index 0000000..270ba1c
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConfigManagerManualTest {
+
+ // TODO: Optimize this manual test to automatic test after the test environment is set up.
+ // @YongzaoDan
+
+ private static final String localhost = "0.0.0.0";
+ private static final int timeOutInMS = 2000;
+
+ private ConfigIService.Client[] clients;
+
+ /**
+ * This is a temporary test of ConfigNode's integration with the ratis-consensus protocol. To run
+ * this code, follow these steps: 1. Compile IoTDB 2. Copy at least three
+ * iotdb-confignode-0.14.0-SNAPSHOT 3. Make sure these parameters: config_node_rpc_address(all
+ * 0.0.0.0), config_node_rpc_port(22277, 22279, 22281), config_node_internal_port(22278, 22280,
+ * 22282), consensus_type(all ratis) and config_node_group_address_list(all 0.0.0.0:22278,
+ * 0.0.0.0:22280, 0.0.0.0:22282) in each iotdb-confignode.properties file are set 4. Start these
+ * ConfigNode by yourself 5. Add @Test and run
+ */
+ public void ratisConsensusTest() throws TException, InterruptedException {
+ createClients();
+
+ registerDataNodes();
+
+ queryDataNodes();
+ }
+
+ private void createClients() throws TTransportException {
+ clients = new ConfigIService.Client[3];
+ for (int i = 0; i < 3; i++) {
+ TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
+ transport.open();
+ clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
+ }
+ }
+
+ private void registerDataNodes() throws TException {
+ for (int i = 0; i < 3; i++) {
+ DataNodeRegisterReq req = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
+ DataNodeRegisterResp resp = clients[0].registerDataNode(req);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+ Assert.assertEquals(i, resp.getDataNodeID());
+ }
+ }
+
+ private void queryDataNodes() throws InterruptedException, TException {
+ // sleep 1s to make sure all ConfigNode in ConfigNodeGroup hold the same PartitionTable
+ TimeUnit.SECONDS.sleep(1);
+
+ for (int i = 0; i < 3; i++) {
+ Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+ Assert.assertEquals(3, msgMap.size());
+ for (int j = 0; j < 3; j++) {
+ Assert.assertNotNull(msgMap.get(j));
+ Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+ Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
+ Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
+ }
+ }
+ }
+
+ /**
+ * This is a temporary test of ConfigNode's integration with the ratis-consensus protocol. This
+ * code tests the high availability of the ratis-consensus protocol. Make sure that you have run
+ * according to the comments of ratisConsensusTest before executing this code. Next, close
+ * ConfigNode that occupies ports 22281 and 22282 on the local machine. Finally, run this test.
+ */
+ public void killTest() throws TException {
+ clients = new ConfigIService.Client[2];
+ for (int i = 0; i < 2; i++) {
+ TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
+ transport.open();
+ clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
+ }
+
+ DataNodeRegisterResp resp =
+ clients[1].registerDataNode(new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+ Assert.assertEquals(3, resp.getDataNodeID());
+
+ for (int i = 0; i < 2; i++) {
+ Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+ Assert.assertEquals(4, msgMap.size());
+ for (int j = 0; j < 4; j++) {
+ Assert.assertNotNull(msgMap.get(j));
+ Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+ Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
+ Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
+ }
+ }
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
similarity index 94%
rename from confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index 6039043..f99daa9 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.hash;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,7 +30,7 @@ import java.util.Random;
* This is a not active test class, which can be used for general index testing when there is a new
* DeviceGroup hash algorithm
*/
-public class DeviceGroupHashExecutorTest {
+public class DeviceGroupHashExecutorManualTest {
private static final int deviceGroupCount = 10_000;
private static final String sg = "root.SGGroup.";
@@ -58,9 +59,8 @@ public class DeviceGroupHashExecutorTest {
return devices;
}
- public void GeneralIndexTest() {
- ConfigManager manager =
- new ConfigManager("org.apache.iotdb.commons.hash.BKDRHashExecutor", deviceGroupCount);
+ public void GeneralIndexTest() throws IOException {
+ ConfigManager manager = new ConfigManager();
int[] bucket = new int[deviceGroupCount];
Arrays.fill(bucket, 0);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index b2f169d..fd85d40 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -31,6 +31,7 @@ import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -39,7 +40,7 @@ import java.util.Map;
public class ConfigNodeRPCServerProcessorTest {
@Test
- public void registerDataNodeTest() throws TException {
+ public void registerDataNodeTest() throws TException, IOException {
ConfigNodeRPCServerProcessor processor = new ConfigNodeRPCServerProcessor();
DataNodeRegisterResp resp;
@@ -87,7 +88,7 @@ public class ConfigNodeRPCServerProcessorTest {
}
@Test
- public void setStorageGroupTest() throws TException {
+ public void setStorageGroupTest() throws TException, IOException {
ConfigNodeRPCServerProcessor processor = new ConfigNodeRPCServerProcessor();
TSStatus status;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 244a58b..feb42b5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -23,11 +23,18 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
public class RequestMessage implements Message {
+ private final Logger logger = LoggerFactory.getLogger(RequestMessage.class);
+
private final IConsensusRequest actualRequest;
private volatile ByteString serializedContent;
+ private final int DEFAULT_BUFFER_SIZE = 1024 * 10;
public RequestMessage(IConsensusRequest request) {
this.actualRequest = request;
@@ -43,10 +50,18 @@ public class RequestMessage implements Message {
if (serializedContent == null) {
synchronized (this) {
if (serializedContent == null) {
- assert actualRequest instanceof ByteBufferConsensusRequest;
- ByteBufferConsensusRequest req = (ByteBufferConsensusRequest) actualRequest;
- serializedContent = ByteString.copyFrom(req.getContent());
- req.getContent().flip(); // so that it can be read from other sources
+ ByteBufferConsensusRequest req;
+ if (actualRequest instanceof ByteBufferConsensusRequest) {
+ req = (ByteBufferConsensusRequest) actualRequest;
+ serializedContent = ByteString.copyFrom(req.getContent());
+ req.getContent().flip();
+ } else {
+ // TODO Pooling
+ ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ actualRequest.serializeRequest(byteBuffer);
+ serializedContent = ByteString.copyFrom(byteBuffer);
+ byteBuffer.flip();
+ }
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
index db82c58..f0b1396 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/RegisterManager.java
@@ -32,7 +32,7 @@ import java.util.List;
public class RegisterManager {
private static final Logger logger = LoggerFactory.getLogger(RegisterManager.class);
- private List<IService> iServices;
+ private final List<IService> iServices;
private static long deregisterTimeOut = 10_000L;
public RegisterManager() {