You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/04 07:45:05 UTC
[iotdb] branch master updated: [IOTDB-3820] Build a integration test environment for ConfigNode IT (#6753)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 23f0d63297 [IOTDB-3820] Build a integration test environment for ConfigNode IT (#6753)
23f0d63297 is described below
commit 23f0d63297196f22f701317e159a29ef1498ea57
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Thu Aug 4 15:44:59 2022 +0800
[IOTDB-3820] Build a integration test environment for ConfigNode IT (#6753)
---
integration-test/import-control.xml | 8 +
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 115 +++++-------
.../org/apache/iotdb/it/env/ConfigFactory.java | 1 +
.../org/apache/iotdb/it/env/ConfigNodeWrapper.java | 4 +
.../org/apache/iotdb/it/env/DataNodeWrapper.java | 16 ++
.../java/org/apache/iotdb/it/env/EnvUtils.java | 90 +++++++++
.../java/org/apache/iotdb/it/env/MppConfig.java | 22 +++
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 28 +++
.../apache/iotdb/it/env/StandaloneOnMppEnv.java | 5 +-
.../org/apache/iotdb/itbase/env/BaseConfig.java | 25 +++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 15 ++
.../org/apache/iotdb/db/it/IoTDBConfigNodeIT.java | 209 +++++++++++++++++++++
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 30 +++
13 files changed, 502 insertions(+), 66 deletions(-)
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index e7de9ee378..88aeb34aa7 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -32,4 +32,12 @@
<allow pkg="org\.apache\.iotdb\.it.*" regex="true"/>
<allow pkg="org\.apache\.iotdb\.db\.it\.utils\.TestUtils.*" regex="true"/>
<allow pkg="org\.apache\.iotdb\.db\.constant\.TestConstant.*" regex="true"/>
+ <allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true"/>
+ <allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true"/>
+ <allow pkg="org\.apache\.iotdb\.db\.client.*" regex="true"/>
+ <allow pkg="org\.apache\.iotdb\.rpc.*" regex="true"/>
+ <allow pkg="org\.apache\.commons\.lang3.*" regex="true"/>
+ <allow pkg="org\.apache\.thrift.*" regex="true"/>
+ <allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true"/>
+
</import-control>
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 14767da0d1..5aed5e97ca 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -18,6 +18,11 @@
*/
package org.apache.iotdb.it.env;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
@@ -29,7 +34,6 @@ import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
import org.apache.iotdb.jdbc.IoTDBConnection;
-import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import java.io.File;
@@ -43,7 +47,6 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.iotdb.jdbc.Config.VERSION;
@@ -54,8 +57,6 @@ public abstract class AbstractEnv implements BaseEnv {
private final int NODE_START_TIMEOUT = 100;
private final int PROBE_TIMEOUT_MS = 2000;
private final int NODE_NETWORK_TIMEOUT_MS = 65_000;
- private final String lockFilePath =
- System.getProperty("user.dir") + File.separator + "target" + File.separator + "lock-";
protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList();
private final Random rand = new Random();
@@ -69,7 +70,8 @@ public abstract class AbstractEnv implements BaseEnv {
final String testMethodName = getTestMethodName();
ConfigNodeWrapper seedConfigNodeWrapper =
- new ConfigNodeWrapper(true, "", testClassName, testMethodName, searchAvailablePorts());
+ new ConfigNodeWrapper(
+ true, "", testClassName, testMethodName, EnvUtils.searchAvailablePorts());
seedConfigNodeWrapper.createDir();
seedConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
seedConfigNodeWrapper.start();
@@ -81,11 +83,15 @@ public abstract class AbstractEnv implements BaseEnv {
for (int i = 1; i < configNodesNum; i++) {
ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
- false, targetConfigNode, testClassName, testMethodName, searchAvailablePorts());
+ false,
+ targetConfigNode,
+ testClassName,
+ testMethodName,
+ EnvUtils.searchAvailablePorts());
this.configNodeWrapperList.add(configNodeWrapper);
configNodeEndpoints.add(configNodeWrapper.getIpAndPortString());
configNodeWrapper.createDir();
- configNodeWrapper.changeConfig(null);
+ configNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
configNodesDelegate.addRequest(
() -> {
configNodeWrapper.start();
@@ -105,7 +111,7 @@ public abstract class AbstractEnv implements BaseEnv {
for (int i = 0; i < dataNodesNum; i++) {
DataNodeWrapper dataNodeWrapper =
new DataNodeWrapper(
- targetConfigNode, testClassName, testMethodName, searchAvailablePorts());
+ targetConfigNode, testClassName, testMethodName, EnvUtils.searchAvailablePorts());
this.dataNodeWrapperList.add(dataNodeWrapper);
dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString());
dataNodeWrapper.createDir();
@@ -134,7 +140,7 @@ public abstract class AbstractEnv implements BaseEnv {
nodeWrapper.stop();
nodeWrapper.waitingToShutDown();
nodeWrapper.destroyDir();
- String lockPath = getLockFilePath(nodeWrapper.getPort());
+ String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
if (!new File(lockPath).delete()) {
logger.error("Delete lock file {} failed", lockPath);
}
@@ -195,60 +201,6 @@ public abstract class AbstractEnv implements BaseEnv {
cleanupEnvironment();
}
- public final int[] searchAvailablePorts() {
- do {
- int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
- randomPortStart = randomPortStart * 10 + 1;
- File lockFile = new File(getLockFilePath(randomPortStart));
- if (lockFile.exists()) {
- continue;
- }
-
- List<Integer> requiredPorts =
- IntStream.rangeClosed(randomPortStart, randomPortStart + 9)
- .boxed()
- .collect(Collectors.toList());
- try {
- if (checkPortsAvailable(requiredPorts) && lockFile.createNewFile()) {
- return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
- }
- } catch (IOException e) {
- // ignore
- }
- } while (true);
- }
-
- private boolean checkPortsAvailable(List<Integer> ports) {
- String cmd = getSearchAvailablePortCmd(ports);
- try {
- Process proc = Runtime.getRuntime().exec(cmd);
- return proc.waitFor() == 1;
- } catch (IOException e) {
- // ignore
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return false;
- }
-
- private String getSearchAvailablePortCmd(List<Integer> ports) {
- if (SystemUtils.IS_OS_WINDOWS) {
- return getWindowsSearchPortCmd(ports);
- }
- return getUnixSearchPortCmd(ports);
- }
-
- private String getWindowsSearchPortCmd(List<Integer> ports) {
- String cmd = "netstat -aon -p tcp | findStr ";
- return cmd
- + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" "));
- }
-
- private String getUnixSearchPortCmd(List<Integer> ports) {
- String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
- return cmd + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
- }
-
@Override
public Connection getConnection() throws SQLException {
return new ClusterTestConnection(getWriteConnection(null), getReadConnections(null));
@@ -349,7 +301,40 @@ public abstract class AbstractEnv implements BaseEnv {
}
}
- private String getLockFilePath(int port) {
- return lockFilePath + port;
+ @Override
+ public List<ConfigNodeWrapper> getConfigNodeWrapperList() {
+ return configNodeWrapperList;
+ }
+
+ @Override
+ public void setConfigNodeWrapperList(List<ConfigNodeWrapper> configNodeWrapperList) {
+ this.configNodeWrapperList = configNodeWrapperList;
+ }
+
+ @Override
+ public List<DataNodeWrapper> getDataNodeWrapperList() {
+ return dataNodeWrapperList;
+ }
+
+ @Override
+ public void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList) {
+ this.dataNodeWrapperList = dataNodeWrapperList;
+ }
+
+ @Override
+ public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+ IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
+ new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+ for (int i = 0; i < 30; i++) {
+ try {
+ return clientManager.borrowClient(
+ new TEndPoint(
+ configNodeWrapperList.get(0).getIp(), configNodeWrapperList.get(0).getPort()));
+ } catch (IOException ignored) {
+ }
+ }
+ throw new IOException("Failed to get config node connection");
}
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigFactory.java
index 22f48193b5..ec84b74510 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigFactory.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigFactory.java
@@ -38,6 +38,7 @@ public class ConfigFactory {
break;
case "LocalStandaloneOnMpp":
case "Cluster1":
+ case "Cluster2":
config = new MppConfig();
break;
case "Remote":
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
index 4fdcdda362..5b6053227d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
@@ -92,4 +92,8 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
"org.apache.iotdb.confignode.service.ConfigNode",
"-s"));
}
+
+ public int getConsensusPort() {
+ return consensusPort;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index 53511f3803..c3a9ad7c1f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -88,4 +88,20 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
protected String mainClassName() {
return "org.apache.iotdb.db.service.DataNode";
}
+
+ public int getMppDataExchangePort() {
+ return mppDataExchangePort;
+ }
+
+ public int getInternalPort() {
+ return internalPort;
+ }
+
+ public int getDataRegionConsensusPort() {
+ return dataRegionConsensusPort;
+ }
+
+ public int getSchemaRegionConsensusPort() {
+ return schemaRegionConsensusPort;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvUtils.java
new file mode 100644
index 0000000000..381c5cdbe7
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.it.env;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class EnvUtils {
+ private static final String lockFilePath =
+ System.getProperty("user.dir") + File.separator + "target" + File.separator + "lock-";
+
+ public static int[] searchAvailablePorts() {
+ do {
+ int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
+ randomPortStart = randomPortStart * 10 + 1;
+ File lockFile = new File(getLockFilePath(randomPortStart));
+ if (lockFile.exists()) {
+ continue;
+ }
+
+ List<Integer> requiredPorts =
+ IntStream.rangeClosed(randomPortStart, randomPortStart + 9)
+ .boxed()
+ .collect(Collectors.toList());
+ try {
+ if (checkPortsAvailable(requiredPorts) && lockFile.createNewFile()) {
+ return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ } while (true);
+ }
+
+ private static boolean checkPortsAvailable(List<Integer> ports) {
+ String cmd = getSearchAvailablePortCmd(ports);
+ try {
+ Process proc = Runtime.getRuntime().exec(cmd);
+ return proc.waitFor() == 1;
+ } catch (IOException e) {
+ // ignore
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ private static String getSearchAvailablePortCmd(List<Integer> ports) {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ return getWindowsSearchPortCmd(ports);
+ }
+ return getUnixSearchPortCmd(ports);
+ }
+
+ private static String getWindowsSearchPortCmd(List<Integer> ports) {
+ String cmd = "netstat -aon -p tcp | findStr ";
+ return cmd
+ + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" "));
+ }
+
+ private static String getUnixSearchPortCmd(List<Integer> ports) {
+ String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
+ return cmd + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
+ }
+
+ public static String getLockFilePath(int port) {
+ return lockFilePath + port;
+ }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 89a2611fcc..b1c57791b8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -204,4 +204,26 @@ public class MppConfig implements BaseConfig {
engineProperties.setProperty("max_tsblock_line_number", String.valueOf(maxTsBlockLineNumber));
return this;
}
+
+ @Override
+ public BaseConfig setConfigNodeConsesusProtocolClass(String configNodeConsesusProtocolClass) {
+ confignodeProperties.setProperty(
+ "config_node_consensus_protocol_class", configNodeConsesusProtocolClass);
+ return this;
+ }
+
+ @Override
+ public BaseConfig setSchemaRegionConsensusProtocolClass(
+ String schemaRegionConsensusProtocolClass) {
+ confignodeProperties.setProperty(
+ "schema_region_consensus_protocol_class", schemaRegionConsensusProtocolClass);
+ return this;
+ }
+
+ @Override
+ public BaseConfig setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+ confignodeProperties.setProperty(
+ "data_region_consensus_protocol_class", dataRegionConsensusProtocolClass);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index a9be62426f..23c0a41f8d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -18,14 +18,17 @@
*/
package org.apache.iotdb.it.env;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
import static org.apache.iotdb.jdbc.Config.VERSION;
import static org.junit.Assert.fail;
@@ -113,4 +116,29 @@ public class RemoteServerEnv implements BaseEnv {
public void dumpTestJVMSnapshot() {
// Do nothing
}
+
+ @Override
+ public List<ConfigNodeWrapper> getConfigNodeWrapperList() {
+ return null;
+ }
+
+ @Override
+ public void setConfigNodeWrapperList(List<ConfigNodeWrapper> configNodeWrapperList) {
+ // Do nothing
+ }
+
+ @Override
+ public List<DataNodeWrapper> getDataNodeWrapperList() {
+ return null;
+ }
+
+ @Override
+ public void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList) {
+ // Do nothing
+ }
+
+ @Override
+ public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+ return null;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
index 5caefb5bce..a765c675d6 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
@@ -31,7 +31,10 @@ public class StandaloneOnMppEnv extends AbstractEnv {
private void initEnvironment() {
DataNodeWrapper dataNodeWrapper =
new StandaloneDataNodeWrapper(
- null, super.getTestClassName(), super.getTestMethodName(), searchAvailablePorts());
+ null,
+ super.getTestClassName(),
+ super.getTestMethodName(),
+ EnvUtils.searchAvailablePorts());
dataNodeWrapper.createDir();
dataNodeWrapper.changeConfig(ConfigFactory.getConfig().getEngineProperties());
dataNodeWrapper.start();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 8066de123d..36cdd9bc49 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -193,4 +193,29 @@ public interface BaseConfig {
default BaseConfig setMaxTsBlockLineNumber(int maxTsBlockLineNumber) {
return this;
}
+
+ default BaseConfig setConfigNodeConsesusProtocolClass(String configNodeConsesusProtocolClass) {
+ return this;
+ }
+
+ default String getConfigNodeConsesusProtocolClass() {
+ return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
+ }
+
+ default BaseConfig setSchemaRegionConsensusProtocolClass(
+ String schemaRegionConsensusProtocolClass) {
+ return this;
+ }
+
+ default String getSchemaRegionConsensusProtocolClass() {
+ return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
+ }
+
+ default BaseConfig setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass) {
+ return this;
+ }
+
+ default String getDataRegionConsensusProtocolClass() {
+ return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8e5650d44b..1f7f43cc47 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -18,10 +18,15 @@
*/
package org.apache.iotdb.itbase.env;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
import org.apache.iotdb.jdbc.Constant;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.List;
public interface BaseEnv {
@@ -40,4 +45,14 @@ public interface BaseEnv {
void setTestMethodName(String testCaseName);
void dumpTestJVMSnapshot();
+
+ List<ConfigNodeWrapper> getConfigNodeWrapperList();
+
+ void setConfigNodeWrapperList(List<ConfigNodeWrapper> configNodeWrapperList);
+
+ List<DataNodeWrapper> getDataNodeWrapperList();
+
+ void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList);
+
+ IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java
new file mode 100644
index 0000000000..248873524d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.EnvUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBConfigNodeIT {
+
+ protected static String originalConfigNodeConsensusProtocolClass;
+ protected static String originalSchemaRegionConsensusProtocolClass;
+ protected static String originalDataRegionConsensusProtocolClass;
+
+ private final int retryNum = 30;
+
+ @Before
+ public void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ originalSchemaRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+
+ EnvFactory.getEnv().initBeforeClass();
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ }
+
+ private TShowClusterResp getClusterNodeInfos(
+ IConfigNodeRPCService.Iface client, int expectedConfigNodeNum, int expectedDataNodeNum)
+ throws TException, InterruptedException {
+ TShowClusterResp clusterNodes = null;
+ for (int i = 0; i < retryNum; i++) {
+ clusterNodes = client.showCluster();
+ if (clusterNodes.getConfigNodeListSize() == expectedConfigNodeNum
+ && clusterNodes.getDataNodeListSize() == expectedDataNodeNum) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ assertEquals(expectedConfigNodeNum, clusterNodes.getConfigNodeListSize());
+ assertEquals(expectedDataNodeNum, clusterNodes.getDataNodeListSize());
+
+ return clusterNodes;
+ }
+
+ private void checkNodeConfig(
+ List<TConfigNodeLocation> configNodeList,
+ List<TDataNodeLocation> dataNodeList,
+ List<ConfigNodeWrapper> configNodeWrappers,
+ List<DataNodeWrapper> dataNodeWrappers) {
+ // check ConfigNode
+ for (TConfigNodeLocation configNodeLocation : configNodeList) {
+ boolean found = false;
+ for (ConfigNodeWrapper configNodeWrapper : configNodeWrappers) {
+ if (configNodeWrapper.getIp().equals(configNodeLocation.getInternalEndPoint().getIp())
+ && configNodeWrapper.getPort() == configNodeLocation.getInternalEndPoint().getPort()
+ && configNodeWrapper.getConsensusPort()
+ == configNodeLocation.getConsensusEndPoint().getPort()) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+
+ // check DataNode
+ for (TDataNodeLocation dataNodeLocation : dataNodeList) {
+ boolean found = false;
+ for (DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
+ if (dataNodeWrapper.getIp().equals(dataNodeLocation.getClientRpcEndPoint().getIp())
+ && dataNodeWrapper.getPort() == dataNodeLocation.getClientRpcEndPoint().getPort()
+ && dataNodeWrapper.getInternalPort() == dataNodeLocation.getInternalEndPoint().getPort()
+ && dataNodeWrapper.getSchemaRegionConsensusPort()
+ == dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()
+ && dataNodeWrapper.getDataRegionConsensusPort()
+ == dataNodeLocation.getDataRegionConsensusEndPoint().getPort()) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+ }
+
+ @Test
+ public void removeAndStopConfigNodeTest() {
+ TShowClusterResp clusterNodes;
+ TSStatus status;
+
+ List<ConfigNodeWrapper> configNodeWrappers = EnvFactory.getEnv().getConfigNodeWrapperList();
+ List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ // add ConfigNode
+ for (int i = 0; i < 2; i++) {
+ ConfigNodeWrapper configNodeWrapper =
+ new ConfigNodeWrapper(
+ false,
+ configNodeWrappers.get(0).getIpAndPortString(),
+ "IoTDBConfigNodeIT",
+ "removeAndStopConfigNodeTest",
+ EnvUtils.searchAvailablePorts());
+ configNodeWrapper.createDir();
+ configNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+ configNodeWrapper.start();
+ configNodeWrappers.add(configNodeWrapper);
+ }
+ EnvFactory.getEnv().setConfigNodeWrapperList(configNodeWrappers);
+ clusterNodes = getClusterNodeInfos(client, 3, 3);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), clusterNodes.getStatus().getCode());
+ List<TConfigNodeLocation> configNodeLocationList = clusterNodes.getConfigNodeList();
+ List<TDataNodeLocation> dataNodeLocationList = clusterNodes.getDataNodeList();
+ checkNodeConfig(
+ configNodeLocationList, dataNodeLocationList, configNodeWrappers, dataNodeWrappers);
+
+ // test remove ConfigNode
+ TConfigNodeLocation removedConfigNodeLocation = clusterNodes.getConfigNodeList().get(1);
+ for (int i = 0; i < retryNum; i++) {
+ Thread.sleep(1000);
+ status = client.removeConfigNode(removedConfigNodeLocation);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()) {
+ break;
+ }
+ }
+
+ clusterNodes = getClusterNodeInfos(client, 2, 3);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), clusterNodes.getStatus().getCode());
+ checkNodeConfig(
+ clusterNodes.getConfigNodeList(),
+ clusterNodes.getDataNodeList(),
+ configNodeWrappers,
+ dataNodeWrappers);
+
+ List<TConfigNodeLocation> configNodeList = clusterNodes.getConfigNodeList();
+ for (TConfigNodeLocation configNodeLocation : configNodeList) {
+ assertNotEquals(removedConfigNodeLocation, configNodeLocation);
+ }
+
+ // test stop ConfigNode
+ status = client.stopConfigNode(removedConfigNodeLocation);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index dfc4d4fa9f..c25f4d9e3a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -18,14 +18,19 @@
*/
package org.apache.iotdb.db.it.env;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.List;
import static org.apache.iotdb.jdbc.Config.VERSION;
import static org.junit.Assert.fail;
@@ -94,4 +99,29 @@ public class StandaloneEnv implements BaseEnv {
public void dumpTestJVMSnapshot() {
// Do nothing
}
+
+ @Override
+ public List<ConfigNodeWrapper> getConfigNodeWrapperList() {
+ return null;
+ }
+
+ @Override
+ public void setConfigNodeWrapperList(List<ConfigNodeWrapper> configNodeWrapperList) {
+ // Do nothing
+ }
+
+ @Override
+ public List<DataNodeWrapper> getDataNodeWrapperList() {
+ return null;
+ }
+
+ @Override
+ public void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList) {
+ // Do nothing
+ }
+
+ @Override
+ public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+ return null;
+ }
}