You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/19 14:10:38 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5368] add port check for confignode and datanode (#9270) (#9371)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new ba19b4cd71 [IOTDB-5368] add port check for confignode and datanode (#9270) (#9371)
ba19b4cd71 is described below

commit ba19b4cd714b9944fba7dbcc2bf165dab8e894af
Author: YuFengLiu <38...@users.noreply.github.com>
AuthorDate: Sun Mar 19 22:10:31 2023 +0800

    [IOTDB-5368] add port check for confignode and datanode (#9270) (#9371)
---
 .../confignode/conf/ConfigNodeStartupCheck.java    | 40 ++++++++-----
 .../confignode/service/ConfigNodeCommandLine.java  |  7 +--
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 47 +++++++++++++++
 .../iotdb/commons/service/StartupChecks.java       | 33 ++++++-----
 .../apache/iotdb/db/conf/DataNodeStartupCheck.java | 69 ++++++++++++++++++++++
 .../java/org/apache/iotdb/db/service/DataNode.java | 37 ++++++------
 6 files changed, 180 insertions(+), 53 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index f889c131e1..47c0c26f55 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.conf;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -31,18 +32,42 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * ConfigNodeStartupCheck checks the parameters in iotdb-confignode.properties and
  * confignode-system.properties when start and restart
  */
-public class ConfigNodeStartupCheck {
+public class ConfigNodeStartupCheck extends StartupChecks {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeStartupCheck.class);
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
 
+  private static final int CONFIGNODE_PORTS = 2;
+
+  public ConfigNodeStartupCheck(String nodeRole) {
+    super(nodeRole);
+  }
+
+  @Override
+  protected void portCheck() throws StartupException {
+    Set<Integer> portSet = new HashSet<>();
+    portSet.add(CONF.getConsensusPort());
+    portSet.add(CONF.getInternalPort());
+    if (portSet.size() != CONFIGNODE_PORTS) {
+      throw new StartupException("ports used in configNode have repeat.");
+    } else {
+      LOGGER.info("configNode port check successful.");
+    }
+  }
+
+  @Override
   public void startUpCheck() throws StartupException, IOException, ConfigurationException {
+    envCheck();
+    portCheck();
+    verify();
     checkGlobalConfig();
     createDirsIfNecessary();
     if (SystemPropertiesUtils.isRestarted()) {
@@ -164,17 +189,4 @@ public class ConfigNodeStartupCheck {
       }
     }
   }
-
-  private static class ConfigNodeConfCheckHolder {
-
-    private static final ConfigNodeStartupCheck INSTANCE = new ConfigNodeStartupCheck();
-
-    private ConfigNodeConfCheckHolder() {
-      // Empty constructor
-    }
-  }
-
-  public static ConfigNodeStartupCheck getInstance() {
-    return ConfigNodeConfCheckHolder.INSTANCE;
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 76423ec104..2877aead62 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
 import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
 
@@ -69,11 +68,9 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
     LOGGER.info("Running mode {}", mode);
     if (MODE_START.equals(mode)) {
       try {
-        // Startup environment check
-        StartupChecks checks = new StartupChecks(IoTDBConstant.CN_ROLE).withDefaultTest();
-        checks.verify();
         // Do ConfigNode startup checks
-        ConfigNodeStartupCheck.getInstance().startUpCheck();
+        ConfigNodeStartupCheck checks = new ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
+        checks.startUpCheck();
       } catch (StartupException | ConfigurationException | IOException e) {
         LOGGER.error("Meet error when doing start checking", e);
         return -1;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index 26e57fab73..5a92658f90 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -67,6 +67,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
   private static final String TEST_CLUSTER_NAME = "defaultCluster";
   private static final String ERROR_CLUSTER_NAME = "errorCluster";
 
+  private static final int START_RETRY_NUM = 10;
+
   @Before
   public void setUp() throws Exception {
     EnvFactory.getEnv()
@@ -297,4 +299,49 @@ public class IoTDBClusterNodeErrorStartUpIT {
               Arrays.asList(NodeStatus.Running, NodeStatus.Running));
     }
   }
+
+  @Test
+  public void testIllegalNodeStartUp()
+      throws IOException, ClientManagerException, InterruptedException, TException {
+    ConfigNodeWrapper portConflictConfigNodeWrapper =
+        EnvFactory.getEnv().generateRandomConfigNodeWrapper();
+    DataNodeWrapper portConflictDataNodeWrapper =
+        EnvFactory.getEnv().generateRandomDataNodeWrapper();
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TShowClusterResp showClusterResp = client.showCluster();
+      int beforeStartConfigNodes = showClusterResp.getConfigNodeListSize();
+      int beforeStartDataNodes = showClusterResp.getDataNodeListSize();
+      // set ConfigNode port repeat
+      portConflictConfigNodeWrapper.setConsensusPort(portConflictConfigNodeWrapper.getPort());
+      portConflictConfigNodeWrapper.changeConfig(
+          (MppBaseConfig) EnvFactory.getEnv().getConfig().getConfigNodeConfig(),
+          (MppCommonConfig) EnvFactory.getEnv().getConfig().getConfigNodeCommonConfig(),
+          null);
+      portConflictConfigNodeWrapper.start();
+      int afterStartConfigNodes;
+      for (int i = 0; i < START_RETRY_NUM; ++i) {
+        showClusterResp = client.showCluster();
+        afterStartConfigNodes = showClusterResp.getConfigNodeListSize();
+        Assert.assertEquals(beforeStartConfigNodes, afterStartConfigNodes);
+        Thread.sleep(1000);
+      }
+
+      // set datanode port repeat
+      portConflictDataNodeWrapper.setMppDataExchangePort(
+          portConflictDataNodeWrapper.getDataRegionConsensusPort());
+      portConflictDataNodeWrapper.changeConfig(
+          (MppBaseConfig) EnvFactory.getEnv().getConfig().getDataNodeConfig(),
+          (MppCommonConfig) EnvFactory.getEnv().getConfig().getDataNodeCommonConfig(),
+          null);
+      portConflictDataNodeWrapper.start();
+      int afterStartDataNodes;
+      for (int i = 0; i < START_RETRY_NUM; ++i) {
+        showClusterResp = client.showCluster();
+        afterStartDataNodes = showClusterResp.getDataNodeListSize();
+        Assert.assertEquals(beforeStartDataNodes, afterStartDataNodes);
+        Thread.sleep(1000);
+      }
+    }
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/StartupChecks.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/StartupChecks.java
index 23e9d255d3..cd247bf3b7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/StartupChecks.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/StartupChecks.java
@@ -28,10 +28,12 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class StartupChecks {
+public abstract class StartupChecks {
 
   private static final Logger logger = LoggerFactory.getLogger(StartupChecks.class);
-  public static final StartupCheck checkJDK =
+
+  private final String nodeRole;
+  private static final StartupCheck checkJDK =
       () -> {
         int version = JVMCommonUtils.getJdkVersion();
         if (version < IoTDBConstant.MIN_SUPPORTED_JDK_VERSION) {
@@ -43,16 +45,14 @@ public class StartupChecks {
           logger.info("JDK version is {}.", version);
         }
       };
-  private final List<StartupCheck> preChecks = new ArrayList<>();
-  private final List<StartupCheck> defaultTests = new ArrayList<>();
+  protected final List<StartupCheck> preChecks = new ArrayList<>();
 
-  public StartupChecks(String nodeRole) {
-    defaultTests.add(() -> checkJMXPort(nodeRole));
-    defaultTests.add(checkJDK);
+  protected StartupChecks(String nodeRole) {
+    this.nodeRole = nodeRole;
   }
 
   private void checkJMXPort(String nodeRole) {
-    Boolean jmxLocal = Boolean.valueOf(System.getProperty(IoTDBConstant.IOTDB_JMX_LOCAL));
+    boolean jmxLocal = Boolean.parseBoolean(System.getProperty(IoTDBConstant.IOTDB_JMX_LOCAL));
     String jmxPort = System.getProperty(IoTDBConstant.IOTDB_JMX_PORT);
 
     if (jmxLocal) {
@@ -66,7 +66,7 @@ public class StartupChecks {
               ? IoTDBConstant.DN_ENV_FILE_NAME
               : IoTDBConstant.CN_ENV_FILE_NAME;
       logger.warn(
-          "{} missing from {}.sh(Unix or OS X, if you use Windows," + " check conf/{}.bat)",
+          "{} missing from {}.sh(Unix or OS X, if you use Windows, check conf/{}.bat)",
           IoTDBConstant.IOTDB_JMX_PORT,
           filename,
           filename);
@@ -76,15 +76,18 @@ public class StartupChecks {
     }
   }
 
-  public StartupChecks withDefaultTest() {
-    preChecks.addAll(defaultTests);
-    return this;
+  protected void envCheck() {
+    preChecks.add(() -> checkJMXPort(nodeRole));
+    preChecks.add(checkJDK);
   }
-
-  /** execute every pretests. */
-  public void verify() throws StartupException {
+  /** execute every pretest. */
+  protected void verify() throws StartupException {
     for (StartupCheck check : preChecks) {
       check.execute();
     }
   }
+
+  protected abstract void portCheck() throws StartupException;
+
+  protected abstract void startUpCheck() throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
new file mode 100644
index 0000000000..e0147881ee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.StartupChecks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * DataNodeStartupCheck checks the parameters in iotdb-datanode.properties when start and restart
+ */
+public class DataNodeStartupCheck extends StartupChecks {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeStartupCheck.class);
+  private final IoTDBConfig config;
+  private static final int DATANODE_PORTS = 6;
+
+  public DataNodeStartupCheck(String nodeRole, IoTDBConfig config) {
+    super(nodeRole);
+    this.config = config;
+  }
+
+  private void checkDataNodePortUnique() throws StartupException {
+    Set<Integer> portSet = new HashSet<>();
+    portSet.add(config.getInternalPort());
+    portSet.add(config.getMqttPort());
+    portSet.add(config.getRpcPort());
+    portSet.add(config.getMppDataExchangePort());
+    portSet.add(config.getDataRegionConsensusPort());
+    portSet.add(config.getSchemaRegionConsensusPort());
+    if (portSet.size() != DATANODE_PORTS)
+      throw new StartupException("ports used in datanode have repeat.");
+    else {
+      LOGGER.info("DataNode port check successful.");
+    }
+  }
+
+  @Override
+  protected void portCheck() {
+    preChecks.add(this::checkDataNodePortUnique);
+  }
+
+  @Override
+  public void startUpCheck() throws StartupException {
+    envCheck();
+    portCheck();
+    verify();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 45959def26..7747bcdd08 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -28,12 +28,10 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
-import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
@@ -54,6 +52,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.DataNodeStartupCheck;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.IoTDBStartCheck;
@@ -64,7 +63,6 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -130,6 +128,9 @@ public class DataNode implements DataNodeMBean {
   private final TriggerInformationUpdater triggerInformationUpdater =
       new TriggerInformationUpdater();
 
+  private static final String REGISTER_INTERRUPTION =
+      "Unexpected interruption when waiting to register to the cluster";
+
   private DataNode() {
     // we do not init anything here, so that we can re-initialize the instance in IT.
   }
@@ -141,7 +142,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   public static void main(String[] args) {
-    logger.info("IoTDB-DataNode environment variables: " + IoTDBConfig.getEnvironmentVariables());
+    logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables());
     new DataNodeServerCommandLine().doMain(args);
   }
 
@@ -177,10 +178,10 @@ public class DataNode implements DataNodeMBean {
       // Serialize mutable system properties
       IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
 
-      logger.info("IoTDB configuration: " + config.getConfigMessage());
+      logger.info("IoTDB configuration: {}", config.getConfigMessage());
       logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
 
-    } catch (StartupException | ConfigurationException | IOException e) {
+    } catch (StartupException | IOException e) {
       logger.error("Fail to start server", e);
       if (isFirstStart) {
         // Delete the system.properties file when first start failed.
@@ -192,7 +193,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   /** Prepare cluster IoTDB-DataNode */
-  private boolean prepareDataNode() throws StartupException, ConfigurationException, IOException {
+  private boolean prepareDataNode() throws StartupException, IOException {
     // Set cluster mode
     config.setClusterMode(true);
 
@@ -212,9 +213,8 @@ public class DataNode implements DataNodeMBean {
     thisNode.setPort(config.getInternalPort());
 
     // Startup checks
-    StartupChecks checks = new StartupChecks(IoTDBConstant.DN_ROLE).withDefaultTest();
-    checks.verify();
-
+    DataNodeStartupCheck checks = new DataNodeStartupCheck(IoTDBConstant.DN_ROLE, config);
+    checks.startUpCheck();
     return isFirstStart;
   }
 
@@ -254,8 +254,8 @@ public class DataNode implements DataNodeMBean {
         Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Unexpected interruption when waiting to register to the cluster", e);
-        break;
+        logger.warn(REGISTER_INTERRUPTION, e);
+        retry = -1;
       }
     }
     if (configurationResp == null) {
@@ -364,8 +364,8 @@ public class DataNode implements DataNodeMBean {
         Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Unexpected interruption when waiting to register to the cluster", e);
-        break;
+        logger.warn(REGISTER_INTERRUPTION, e);
+        retry = -1;
       }
     }
     if (dataNodeRegisterResp == null) {
@@ -424,8 +424,8 @@ public class DataNode implements DataNodeMBean {
         Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Unexpected interruption when waiting to register to the cluster", e);
-        break;
+        logger.warn(REGISTER_INTERRUPTION, e);
+        retry = -1;
       }
     }
     if (dataNodeRestartResp == null) {
@@ -458,7 +458,7 @@ public class DataNode implements DataNodeMBean {
     try {
       processPid();
       setUp();
-    } catch (StartupException | QueryProcessException e) {
+    } catch (StartupException e) {
       logger.error("Meet error while starting up.", e);
       throw new StartupException("Error in activating IoTDB DataNode.");
     }
@@ -479,7 +479,7 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  private void setUp() throws StartupException, QueryProcessException {
+  private void setUp() throws StartupException {
     logger.info("Setting up IoTDB DataNode...");
     registerManager.register(new JMXService());
     JMXService.registerMBean(getInstance(), mbeanName);
@@ -859,7 +859,6 @@ public class DataNode implements DataNodeMBean {
   private void deactivate() {
     logger.info("Deactivating IoTDB DataNode...");
     stopTriggerRelatedServices();
-    // stopThreadPools();
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
     logger.info("IoTDB DataNode is deactivated.");