You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/27 07:30:11 UTC
[iotdb] 01/02: serialize datanodeid to system.properties
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch dataNodeId
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b23a0b84f8ff9a7d93fa3cfa7bf7dfa5ddd9ea44
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Apr 27 15:29:32 2022 +0800
serialize datanodeid to system.properties
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++++
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 33 +++++++++++++++++++---
.../java/org/apache/iotdb/db/service/DataNode.java | 14 +++++----
3 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 527dd4e42b..1d93eb033d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -499,6 +499,9 @@ public class IoTDBConfig {
/** indicate whether current mode is cluster */
private boolean isClusterMode = false;
+ /** the data node id for cluster mode */
+ private int dataNodeId = -1;
+
/** Replace implementation class of influxdb protocol service */
private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
@@ -2822,6 +2825,14 @@ public class IoTDBConfig {
this.isClusterMode = isClusterMode;
}
+ public int getDataNodeId() {
+ return dataNodeId;
+ }
+
+ public void setDataNodeId(int dataNodeId) {
+ this.dataNodeId = dataNodeId;
+ }
+
public int getDataNodeSchemaCacheSize() {
return dataNodeSchemaCacheSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 07034d28d3..8ce3d825bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -106,6 +106,8 @@ public class IoTDBConfigCheck {
private static String timeEncoderValue =
String.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ private static final String DATA_NODE_ID = "data_node_id";
+
private static final String IOTDB_VERSION_STRING = "iotdb_version";
public static IoTDBConfigCheck getInstance() {
@@ -363,10 +365,6 @@ public class IoTDBConfigCheck {
throwException(TIME_ENCODER_KEY, timeEncoderValue);
}
- if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) {
- throwException(TIME_ENCODER_KEY, timeEncoderValue);
- }
-
if (!(properties.getProperty(ENABLE_ID_TABLE).equals(enableIDTable))) {
throwException(ENABLE_ID_TABLE, enableIDTable);
}
@@ -378,10 +376,37 @@ public class IoTDBConfigCheck {
if (!(properties.getProperty(SCHEMA_ENGINE_MODE).equals(schemaEngineMode))) {
throwException(SCHEMA_ENGINE_MODE, schemaEngineMode);
}
+
+ // properties contain DATA_NODE_ID only when start as Data node
+ if (properties.containsKey(DATA_NODE_ID)) {
+ config.setDataNodeId(Integer.parseInt(properties.getProperty(DATA_NODE_ID)));
+ }
}
private void throwException(String parameter, Object badValue) throws ConfigurationException {
throw new ConfigurationException(
parameter, String.valueOf(badValue), properties.getProperty(parameter));
}
+
+ /** call this method to serialize DataNodeId */
+ public void serializeDataNodeId(int dataNodeId) throws IOException {
+ // create an empty tmpPropertiesFile
+ if (tmpPropertiesFile.createNewFile()) {
+ logger.info("Create system.properties.tmp {}.", tmpPropertiesFile);
+ } else {
+ logger.error("Create system.properties.tmp {} failed.", tmpPropertiesFile);
+ System.exit(-1);
+ }
+
+ try (FileOutputStream tmpFOS = new FileOutputStream(tmpPropertiesFile.toString())) {
+ properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
+ properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
+ // upgrade finished, delete old system.properties file
+ if (propertiesFile.exists()) {
+ Files.delete(propertiesFile.toPath());
+ }
+ }
+ // rename system.properties.tmp to system.properties
+ FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 0c474d6334..c2c835d0e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -80,8 +80,6 @@ public class DataNode implements DataNodeMBean {
private TEndPoint thisNode = new TEndPoint();
- private int dataNodeID;
-
private DataNode() {
// we do not init anything here, so that we can re-initialize the instance in IT.
}
@@ -149,7 +147,7 @@ public class DataNode implements DataNodeMBean {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
TDataNodeRegisterReq req = new TDataNodeRegisterReq();
TDataNodeLocation location = new TDataNodeLocation();
- location.setDataNodeId(-1);
+ location.setDataNodeId(config.getDataNodeId());
location.setExternalEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
location.setInternalEndPoint(
new TEndPoint(config.getInternalIp(), config.getInternalPort()));
@@ -164,12 +162,16 @@ public class DataNode implements DataNodeMBean {
== TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
- dataNodeID = dataNodeRegisterResp.getDataNodeId();
+ int dataNodeID = dataNodeRegisterResp.getDataNodeId();
+ if (dataNodeID != config.getDataNodeId()) {
+ IoTDBConfigCheck.getInstance().serializeDataNodeId(dataNodeID);
+ config.setDataNodeId(dataNodeID);
+ }
IoTDBDescriptor.getInstance().loadGlobalConfig(dataNodeRegisterResp.globalConfig);
logger.info("Joined the cluster successfully");
return;
}
- } catch (IoTDBConnectionException e) {
+ } catch (IOException | IoTDBConnectionException e) {
logger.warn("Cannot join the cluster, because: {}", e.getMessage());
}
@@ -341,6 +343,8 @@ public class DataNode implements DataNodeMBean {
Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
}
+ private void dataNodeIdChecker() {}
+
private static class DataNodeHolder {
private static final DataNode INSTANCE = new DataNode();