You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/27 07:30:11 UTC

[iotdb] 01/02: serialize datanodeid to system.properties

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch dataNodeId
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b23a0b84f8ff9a7d93fa3cfa7bf7dfa5ddd9ea44
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Apr 27 15:29:32 2022 +0800

    serialize datanodeid to system.properties
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++++
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 33 +++++++++++++++++++---
 .../java/org/apache/iotdb/db/service/DataNode.java | 14 +++++----
 3 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 527dd4e42b..1d93eb033d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -499,6 +499,9 @@ public class IoTDBConfig {
   /** indicate whether current mode is cluster */
   private boolean isClusterMode = false;
 
+  /** the data node id for cluster mode */
+  private int dataNodeId = -1;
+
   /** Replace implementation class of influxdb protocol service */
   private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
 
@@ -2822,6 +2825,14 @@ public class IoTDBConfig {
     this.isClusterMode = isClusterMode;
   }
 
+  public int getDataNodeId() {
+    return dataNodeId;
+  }
+
+  public void setDataNodeId(int dataNodeId) {
+    this.dataNodeId = dataNodeId;
+  }
+
   public int getDataNodeSchemaCacheSize() {
     return dataNodeSchemaCacheSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 07034d28d3..8ce3d825bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -106,6 +106,8 @@ public class IoTDBConfigCheck {
   private static String timeEncoderValue =
       String.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
 
+  private static final String DATA_NODE_ID = "data_node_id";
+
   private static final String IOTDB_VERSION_STRING = "iotdb_version";
 
   public static IoTDBConfigCheck getInstance() {
@@ -363,10 +365,6 @@ public class IoTDBConfigCheck {
       throwException(TIME_ENCODER_KEY, timeEncoderValue);
     }
 
-    if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) {
-      throwException(TIME_ENCODER_KEY, timeEncoderValue);
-    }
-
     if (!(properties.getProperty(ENABLE_ID_TABLE).equals(enableIDTable))) {
       throwException(ENABLE_ID_TABLE, enableIDTable);
     }
@@ -378,10 +376,37 @@ public class IoTDBConfigCheck {
     if (!(properties.getProperty(SCHEMA_ENGINE_MODE).equals(schemaEngineMode))) {
       throwException(SCHEMA_ENGINE_MODE, schemaEngineMode);
     }
+
+    // properties contain DATA_NODE_ID only when start as Data node
+    if (properties.containsKey(DATA_NODE_ID)) {
+      config.setDataNodeId(Integer.parseInt(properties.getProperty(DATA_NODE_ID)));
+    }
   }
 
   private void throwException(String parameter, Object badValue) throws ConfigurationException {
     throw new ConfigurationException(
         parameter, String.valueOf(badValue), properties.getProperty(parameter));
   }
+
+  /** call this method to serialize DataNodeId */
+  public void serializeDataNodeId(int dataNodeId) throws IOException {
+    // create an empty tmpPropertiesFile
+    if (tmpPropertiesFile.createNewFile()) {
+      logger.info("Create system.properties.tmp {}.", tmpPropertiesFile);
+    } else {
+      logger.error("Create system.properties.tmp {} failed.", tmpPropertiesFile);
+      System.exit(-1);
+    }
+
+    try (FileOutputStream tmpFOS = new FileOutputStream(tmpPropertiesFile.toString())) {
+      properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
+      properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
+      // upgrade finished, delete old system.properties file
+      if (propertiesFile.exists()) {
+        Files.delete(propertiesFile.toPath());
+      }
+    }
+    // rename system.properties.tmp to system.properties
+    FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 0c474d6334..c2c835d0e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -80,8 +80,6 @@ public class DataNode implements DataNodeMBean {
 
   private TEndPoint thisNode = new TEndPoint();
 
-  private int dataNodeID;
-
   private DataNode() {
     // we do not init anything here, so that we can re-initialize the instance in IT.
   }
@@ -149,7 +147,7 @@ public class DataNode implements DataNodeMBean {
         IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
         TDataNodeRegisterReq req = new TDataNodeRegisterReq();
         TDataNodeLocation location = new TDataNodeLocation();
-        location.setDataNodeId(-1);
+        location.setDataNodeId(config.getDataNodeId());
         location.setExternalEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
         location.setInternalEndPoint(
             new TEndPoint(config.getInternalIp(), config.getInternalPort()));
@@ -164,12 +162,16 @@ public class DataNode implements DataNodeMBean {
                 == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             || dataNodeRegisterResp.getStatus().getCode()
                 == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
-          dataNodeID = dataNodeRegisterResp.getDataNodeId();
+          int dataNodeID = dataNodeRegisterResp.getDataNodeId();
+          if (dataNodeID != config.getDataNodeId()) {
+            IoTDBConfigCheck.getInstance().serializeDataNodeId(dataNodeID);
+            config.setDataNodeId(dataNodeID);
+          }
           IoTDBDescriptor.getInstance().loadGlobalConfig(dataNodeRegisterResp.globalConfig);
           logger.info("Joined the cluster successfully");
           return;
         }
-      } catch (IoTDBConnectionException e) {
+      } catch (IOException | IoTDBConnectionException e) {
         logger.warn("Cannot join the cluster, because: {}", e.getMessage());
       }
 
@@ -341,6 +343,8 @@ public class DataNode implements DataNodeMBean {
     Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
   }
 
+  private void dataNodeIdChecker() {}
+
   private static class DataNodeHolder {
 
     private static final DataNode INSTANCE = new DataNode();