You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/16 12:15:25 UTC
[iotdb] branch master updated: [IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 26d6c9e265 [IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)
26d6c9e265 is described below
commit 26d6c9e265e13ce23a93b591f1efae7a1ddcb75c
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Oct 16 20:15:18 2022 +0800
[IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../confignode/conf/SystemPropertiesUtils.java | 24 +-
.../statemachine/PartitionRegionStateMachine.java | 14 +-
.../iotdb/confignode/manager/node/NodeManager.java | 6 +-
.../iotdb/confignode/persistence/NodeInfo.java | 2 -
.../procedure/env/ConfigNodeProcedureEnv.java | 16 -
.../impl/node/AddConfigNodeProcedure.java | 7 +-
.../iotdb/confignode/service/ConfigNode.java | 14 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 338 ---------------------
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 76 ++++-
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 14 +-
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 8 +-
.../iotdb/confignode/IoTDBClusterPartitionIT.java | 52 +++-
.../iotdb/confignode/IoTDBClusterRestartIT.java | 88 ++++++
.../apache/iotdb/confignode/IoTDBConfigNodeIT.java | 8 +-
.../confignode/IoTDBConfigNodeSnapshotIT.java | 2 +-
.../iotdb/confignode/IoTDBStorageGroupIT.java | 183 +++++++++++
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 14 +-
19 files changed, 451 insertions(+), 419 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 627ba6ccb5..7f2fb656f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -32,7 +32,7 @@ public class ConfigNodeConfig {
/**
* the config node id for cluster mode, the default value -1 should be changed after join cluster
*/
- private int configNodeId = 0;
+ private volatile int configNodeId = -1;
/** could set ip or hostname */
private String internalAddress = "0.0.0.0";
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 9c166589d1..71acfbfb51 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -193,6 +193,9 @@ public class SystemPropertiesUtils {
*/
public static void storeSystemParameters() throws IOException {
Properties systemProperties = getSystemProperties();
+
+ systemProperties.setProperty("config_node_id", String.valueOf(conf.getConfigNodeId()));
+
// Startup configuration
systemProperties.setProperty("internal_address", String.valueOf(conf.getInternalAddress()));
systemProperties.setProperty("internal_port", String.valueOf(conf.getInternalPort()));
@@ -239,14 +242,21 @@ public class SystemPropertiesUtils {
storeSystemProperties(systemProperties);
}
- public static void storeConfigNodeId(int nodeId) throws IOException {
- if (!systemPropertiesFile.exists()) {
- return;
- }
-
+ /**
+ * Load the config_node_id in confignode-system.properties file. We only invoke this interface
+ * when restarted.
+ *
+ * @return The property of config_node_id in confignode-system.properties file
+ * @throws IOException When load confignode-system.properties file failed
+ */
+ public static int loadConfigNodeId() throws IOException {
Properties systemProperties = getSystemProperties();
- systemProperties.setProperty("config_node_id", String.valueOf(nodeId));
- storeSystemProperties(systemProperties);
+ try {
+ return Integer.parseInt(systemProperties.getProperty("config_node_id", null));
+ } catch (NumberFormatException e) {
+ throw new IOException(
+ "The parameter config_node_id doesn't exist in confignode-system.properties");
+ }
}
private static synchronized Properties getSystemProperties() throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index a901e5d187..dae61189d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.consensus.statemachine;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
@@ -48,12 +47,10 @@ public class PartitionRegionStateMachine
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
private final TEndPoint currentNodeTEndPoint;
- private final int currentNodeId;
public PartitionRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
this.executor = executor;
this.configManager = configManager;
- this.currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
this.currentNodeTEndPoint =
new TEndPoint()
.setIp(ConfigNodeDescriptor.getInstance().getConf().getInternalAddress())
@@ -149,8 +146,10 @@ public class PartitionRegionStateMachine
@Override
public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
- TConfigNodeLocation newLeaderConfigNodeLocation =
- configManager.getConfigNodeLocation(currentNodeId);
+ // We get currentNodeId here because the currentNodeId
+ // couldn't initialize earlier than the PartitionRegionStateMachine
+ int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+
if (currentNodeId == newLeaderId) {
LOGGER.info(
"Current node [nodeId: {}, ip:port: {}] becomes Leader",
@@ -162,11 +161,10 @@ public class PartitionRegionStateMachine
configManager.getPartitionManager().startRegionCleaner();
} else {
LOGGER.info(
- "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}, ip:port: {}]",
+ "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
currentNodeId,
currentNodeTEndPoint,
- newLeaderId,
- newLeaderConfigNodeLocation.getInternalEndPoint());
+ newLeaderId);
configManager.getProcedureManager().shiftExecutor(false);
configManager.getLoadManager().stopLoadBalancingService();
configManager.getNodeManager().stopHeartbeatService();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 7cb7159060..9e2558838d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -265,7 +265,7 @@ public class NodeManager {
.setConfigNodeId(ERROR_STATUS_NODE_ID);
}
- int nodeId = generateNodeId();
+ int nodeId = nodeInfo.generateNextNodeId();
req.getConfigNodeLocation().setConfigNodeId(nodeId);
configManager.getProcedureManager().addConfigNode(req);
@@ -778,8 +778,4 @@ public class NodeManager {
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
-
- public int generateNodeId() {
- return nodeInfo.generateNextNodeId();
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 2d3876a3f3..7adcf7e389 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -305,8 +305,6 @@ public class NodeInfo implements SnapshotProcessor {
registeredConfigNodes.add(applyConfigNodePlan.getConfigNodeLocation());
SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes));
- SystemPropertiesUtils.storeConfigNodeId(
- applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
LOGGER.info(
"Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
applyConfigNodePlan.getConfigNodeLocation(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 2bbab37673..b1f73ff5de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -94,20 +94,8 @@ public class ConfigNodeProcedureEnv {
private final DataNodeRemoveHandler dataNodeRemoveHandler;
- private static boolean skipForTest = false;
-
- private static boolean invalidCacheResult = true;
-
private final ReentrantLock removeConfigNodeLock;
- public static void setSkipForTest(boolean skipForTest) {
- ConfigNodeProcedureEnv.skipForTest = skipForTest;
- }
-
- public static void setInvalidCacheResult(boolean result) {
- ConfigNodeProcedureEnv.invalidCacheResult = result;
- }
-
public ConfigNodeProcedureEnv(ConfigManager configManager, ProcedureScheduler scheduler) {
this.configManager = configManager;
this.scheduler = scheduler;
@@ -148,10 +136,6 @@ public class ConfigNodeProcedureEnv {
* @throws TException Thrift IOE
*/
public boolean invalidateCache(String storageGroupName) throws IOException, TException {
- // TODO: Remove it after IT is supported
- if (skipForTest) {
- return invalidCacheResult;
- }
List<TDataNodeConfiguration> allDataNodes =
configManager.getNodeManager().getRegisteredDataNodes();
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 7c3e0cd582..9d7d0ab760 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -61,19 +61,22 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
setNextState(AddConfigNodeState.CREATE_PEER);
break;
case CREATE_PEER:
+ LOG.info("Executing createPeerForConsensusGroup on {}...", tConfigNodeLocation);
env.addConsensusGroup(tConfigNodeLocation);
setNextState(AddConfigNodeState.ADD_PEER);
- LOG.info("Add consensus group {}", tConfigNodeLocation);
+ LOG.info("Successfully createPeerForConsensusGroup on {}", tConfigNodeLocation);
break;
case ADD_PEER:
+ LOG.info("Executing addPeer {}...", tConfigNodeLocation);
env.addConfigNodePeer(tConfigNodeLocation);
setNextState(AddConfigNodeState.REGISTER_SUCCESS);
- LOG.info("Add Peer of {}", tConfigNodeLocation);
+ LOG.info("Successfully addPeer {}", tConfigNodeLocation);
break;
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
env.applyConfigNode(tConfigNodeLocation);
env.broadCastTheLatestConfigNodeGroup();
+ LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation);
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index fb26ccc4f9..81aa8bcc82 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -87,6 +87,8 @@ public class ConfigNode implements ConfigNodeMBean {
/* Restart */
if (SystemPropertiesUtils.isRestarted()) {
+ LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
+ CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeId());
configManager.initConsensusManager();
setUpRPCService();
LOGGER.info(
@@ -96,10 +98,18 @@ public class ConfigNode implements ConfigNodeMBean {
/* Initial startup of Seed-ConfigNode */
if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+ LOGGER.info(
+ "The current {} is now starting as the Seed-ConfigNode.",
+ ConfigNodeConstant.GLOBAL_NAME);
+
+ // Init consensusGroup
+ CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
configManager.initConsensusManager();
+ // Persistence system parameters after the consensusGroup is built,
+ // or the consensusGroup will not be initialized successfully otherwise.
SystemPropertiesUtils.storeSystemParameters();
- SystemPropertiesUtils.storeConfigNodeId(SEED_CONFIG_NODE_ID);
+
// Seed-ConfigNode should apply itself when first start
configManager
.getNodeManager()
@@ -232,9 +242,7 @@ public class ConfigNode implements ConfigNodeMBean {
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- SystemPropertiesUtils.storeConfigNodeId(resp.getConfigNodeId());
CONF.setConfigNodeId(resp.getConfigNodeId());
-
configManager.initConsensusManager();
return;
} else if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 67e45635fa..ee060dc306 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -408,7 +408,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
TConfigNodeRegisterResp resp = configManager.registerConfigNode(req);
// Print log to record the ConfigNode that performs the RegisterConfigNodeRequest
- LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, resp.getStatus());
+ LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, resp);
return resp;
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
deleted file mode 100644
index 50a1e800d2..0000000000
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.service.thrift;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TNodeResource;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.exception.ConfigurationException;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
-import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
-import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
-import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
-
-import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigNodeRPCServiceProcessorTest {
-
- ConfigNodeRPCServiceProcessor processor;
-
- @BeforeClass
- public static void beforeClass() throws StartupException, ConfigurationException, IOException {
- final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
- UDFExecutableManager.setupAndGetInstance(
- configNodeConfig.getTemporaryLibDir(), configNodeConfig.getUdfLibDir());
- UDFClassLoaderManager.setupAndGetInstance(configNodeConfig.getUdfLibDir());
- UDFRegistrationService.setupAndGetInstance(configNodeConfig.getSystemUdfDir());
- ConfigNodeStartupCheck.getInstance().startUpCheck();
- }
-
- @Before
- public void before() throws IOException {
- ConfigManager configManager = new ConfigManager();
- configManager.initConsensusManager();
- processor = new ConfigNodeRPCServiceProcessor(configManager);
- processor.getConsensusManager().singleCopyMayWaitUntilLeaderReady();
- }
-
- @After
- public void after() throws IOException {
- processor.close();
- FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
- FileUtils.deleteFully(
- new File(CommonDescriptor.getInstance().getConfig().getProcedureWalFolder()));
- }
-
- @AfterClass
- public static void afterClass() throws IOException {
- UDFExecutableManager.getInstance().stop();
- UDFClassLoaderManager.getInstance().stop();
- UDFRegistrationService.getInstance().stop();
- FileUtils.deleteFully(new File(ConfigNodeConstant.DATA_DIR));
- }
-
- private void checkGlobalConfig(TGlobalConfig globalConfig) {
- Assert.assertEquals(
- ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
- globalConfig.getDataRegionConsensusProtocolClass());
- Assert.assertEquals(
- ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
- globalConfig.getSchemaRegionConsensusProtocolClass());
- Assert.assertEquals(
- ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
- globalConfig.getSeriesPartitionSlotNum());
- Assert.assertEquals(
- ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
- globalConfig.getSeriesPartitionExecutorClass());
- }
-
- private void registerDataNodes() throws TException {
- for (int i = 0; i < 3; i++) {
- TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
- dataNodeLocation.setDataNodeId(-1);
- dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
- dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
- dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
- dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
- dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
-
- TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
- dataNodeConfiguration.setLocation(dataNodeLocation);
- dataNodeConfiguration.setResource(new TNodeResource(8, 1024 * 1024));
-
- TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeConfiguration);
- TDataNodeRegisterResp resp = processor.registerDataNode(req);
-
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
- Assert.assertEquals(i, resp.getDataNodeId());
- checkGlobalConfig(resp.getGlobalConfig());
- }
- }
-
- @Test
- public void testSetAndQueryStorageGroup() throws IllegalPathException, TException {
- TSStatus status;
- final String sg0 = "root.sg0";
- final String sg1 = "root.sg1";
-
- // register DataNodes
- registerDataNodes();
-
- // set StorageGroup0 by default values
- TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // set StorageGroup1 by specific values
- TSetStorageGroupReq setReq1 =
- new TSetStorageGroupReq(
- new TStorageGroupSchema(sg1)
- .setTTL(1024L)
- .setSchemaReplicationFactor(5)
- .setDataReplicationFactor(5)
- .setTimePartitionInterval(2048L));
- status = processor.setStorageGroup(setReq1);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // test count all StorageGroups
- TCountStorageGroupResp countResp =
- processor.countMatchedStorageGroups(Arrays.asList("root", "**"));
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
- Assert.assertEquals(2, countResp.getCount());
-
- // test count one StorageGroup
- countResp = processor.countMatchedStorageGroups(Arrays.asList("root", "sg0"));
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
- Assert.assertEquals(1, countResp.getCount());
-
- // test query all StorageGroupSchemas
- TStorageGroupSchemaResp getResp =
- processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "**"));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
- Map<String, TStorageGroupSchema> schemaMap = getResp.getStorageGroupSchemaMap();
- Assert.assertEquals(2, schemaMap.size());
- TStorageGroupSchema storageGroupSchema = schemaMap.get(sg0);
- Assert.assertNotNull(storageGroupSchema);
- Assert.assertEquals(sg0, storageGroupSchema.getName());
- Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
- Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
- Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
- Assert.assertEquals(604800000, storageGroupSchema.getTimePartitionInterval());
- storageGroupSchema = schemaMap.get(sg1);
- Assert.assertNotNull(storageGroupSchema);
- Assert.assertEquals(sg1, storageGroupSchema.getName());
- Assert.assertEquals(1024L, storageGroupSchema.getTTL());
- Assert.assertEquals(5, storageGroupSchema.getSchemaReplicationFactor());
- Assert.assertEquals(5, storageGroupSchema.getDataReplicationFactor());
- Assert.assertEquals(2048L, storageGroupSchema.getTimePartitionInterval());
-
- // test fail by re-register
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(
- TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
-
- // test StorageGroup setter interfaces
- PartialPath patternPath = new PartialPath(sg1);
- status =
- processor.setTTL(new TSetTTLReq(Arrays.asList(patternPath.getNodes()), Long.MAX_VALUE));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.setSchemaReplicationFactor(new TSetSchemaReplicationFactorReq(sg1, 1));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.setDataReplicationFactor(new TSetDataReplicationFactorReq(sg1, 1));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.setTimePartitionInterval(new TSetTimePartitionIntervalReq(sg1, 604800L));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // test setter results
- getResp = processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "sg1"));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
- schemaMap = getResp.getStorageGroupSchemaMap();
- Assert.assertEquals(1, schemaMap.size());
- storageGroupSchema = schemaMap.get(sg1);
- Assert.assertNotNull(storageGroupSchema);
- Assert.assertEquals(sg1, storageGroupSchema.getName());
- Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
- Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
- Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
- Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
- }
-
- /** Generate a PatternTree and serialize it into a ByteBuffer */
- private ByteBuffer generatePatternTreeBuffer(String[] paths)
- throws IllegalPathException, IOException {
- PathPatternTree patternTree = new PathPatternTree();
- for (String path : paths) {
- patternTree.appendPathPattern(new PartialPath(path));
- }
- patternTree.constructTree();
-
- PublicBAOS baos = new PublicBAOS();
- patternTree.serialize(baos);
- return ByteBuffer.wrap(baos.toByteArray());
- }
-
- @Test
- public void testDeleteStorageGroup() throws TException {
- TSStatus status;
- final String sg0 = "root.sg0";
- final String sg1 = "root.sg1";
- // register DataNodes
- registerDataNodes();
- ConfigNodeProcedureEnv.setSkipForTest(true);
- TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- // set StorageGroup0 by default values
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // set StorageGroup1 by specific values
- TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
- status = processor.setStorageGroup(setReq1);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
- List<String> sgs = Arrays.asList(sg0, sg1);
- deleteStorageGroupsReq.setPrefixPathList(sgs);
- TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
- TStorageGroupSchemaResp root =
- processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
- Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
- }
-
- @Test
- public void testDeleteStorageGroupInvalidateCacheFailed() throws TException {
- TSStatus status;
- final String sg0 = "root.sg0";
- final String sg1 = "root.sg1";
- // register DataNodes
- registerDataNodes();
- ConfigNodeProcedureEnv.setSkipForTest(true);
- ConfigNodeProcedureEnv.setInvalidCacheResult(false);
- TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
- // set StorageGroup0 by default values
- status = processor.setStorageGroup(setReq0);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // set StorageGroup1 by specific values
- TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
- status = processor.setStorageGroup(setReq1);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
- List<String> sgs = Arrays.asList(sg0, sg1);
- deleteStorageGroupsReq.setPrefixPathList(sgs);
- TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
- TStorageGroupSchemaResp root =
- processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
- // rollback success
- Assert.assertEquals(root.getStorageGroupSchemaMap().size(), 2);
- Assert.assertEquals(TSStatusCode.MULTIPLE_ERROR.getStatusCode(), deleteSgStatus.getCode());
- }
-
- @Test
- public void testGetSchemaNodeManagementPartition()
- throws TException, IllegalPathException, IOException {
- final String sg = "root.sg";
- final int storageGroupNum = 2;
-
- TSStatus status;
- TSchemaNodeManagementReq nodeManagementReq;
- TSchemaNodeManagementResp nodeManagementResp;
-
- // register DataNodes
- registerDataNodes();
-
- // set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- }
-
- ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
- nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
- nodeManagementReq.setLevel(-1);
- nodeManagementResp = processor.getSchemaNodeManagementPartition(nodeManagementReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
- Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
- Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
- Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
- }
-}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index b1dc1bdb30..f2474a761b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -35,9 +35,11 @@ import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
import org.apache.iotdb.jdbc.IoTDBConnection;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.ISession;
import org.apache.iotdb.session.Session;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import java.io.File;
@@ -165,6 +167,7 @@ public abstract class AbstractEnv implements BaseEnv {
}
public void testWorking() {
+ logger.info("Testing DataNode connection...");
List<String> endpoints =
dataNodeWrapperList.stream()
.map(DataNodeWrapper::getIpAndPortString)
@@ -178,6 +181,7 @@ public abstract class AbstractEnv implements BaseEnv {
Exception lastException = null;
for (int i = 0; i < 30; i++) {
try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) {
+ logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
return null;
} catch (Exception e) {
lastException = e;
@@ -200,23 +204,40 @@ public abstract class AbstractEnv implements BaseEnv {
}
private void checkNodeHeartbeat() throws Exception {
+ logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
boolean flag;
for (int i = 0; i < 30; i++) {
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
flag = true;
showClusterResp = client.showCluster();
- Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
- for (String status : nodeStatus.values()) {
- if (!status.equals("Running")) {
- flag = false;
- break;
+
+ // Check resp status
+ if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ flag = false;
+ }
+
+ // Check the number of nodes
+ if (showClusterResp.getNodeStatus().size()
+ != configNodeWrapperList.size() + dataNodeWrapperList.size()) {
+ flag = false;
+ }
+
+ // Check the status of nodes
+ if (flag) {
+ Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
+ for (String status : nodeStatus.values()) {
+ if (!status.equals("Running")) {
+ flag = false;
+ break;
+ }
}
}
- int nodeNum = configNodeWrapperList.size() + dataNodeWrapperList.size();
- if (flag && nodeStatus.size() == nodeNum) {
+
+ if (flag) {
+ logger.info("The cluster is now ready for testing!");
return;
}
} catch (Exception e) {
@@ -373,25 +394,48 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
.createClientManager(
new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
for (int i = 0; i < 30; i++) {
- try {
- // Return ConfigNode connection of the Seed-ConfigNode
- return clientManager.borrowClient(
- new TEndPoint(
- configNodeWrapperList.get(0).getIp(), configNodeWrapperList.get(0).getPort()));
- } catch (IOException ignored) {
+ for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
+ try (SyncConfigNodeIServiceClient client =
+ clientManager.borrowClient(
+ new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()))) {
+ TShowClusterResp resp = client.showCluster();
+ // Only the ConfigNodeClient who connects to the ConfigNode-leader
+ // will respond the SUCCESS_STATUS
+ if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.info(
+ "Successfully get connection to the leader ConfigNode: {}",
+ configNodeWrapper.getIp());
+ return client;
+ }
+ } catch (TException e) {
+ logger.error(
+ "Borrow ConfigNodeClient from ConfigNode: {} failed because: {}, retrying...",
+ configNodeWrapper.getIp(),
+ e);
+ }
}
}
throw new IOException("Failed to get config node connection");
}
@Override
- public void restartDataNode(int index) {
+ public void startConfigNode(int index) {
+ configNodeWrapperList.get(index).start();
+ }
+
+ @Override
+ public void shutdownConfigNode(int index) {
+ configNodeWrapperList.get(index).stop();
+ }
+
+ @Override
+ public void startDataNode(int index) {
dataNodeWrapperList.get(index).start();
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 7d81391010..95cf12bf35 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -147,7 +147,7 @@ public class RemoteServerEnv implements BaseEnv {
}
@Override
- public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
return null;
}
@@ -159,7 +159,17 @@ public class RemoteServerEnv implements BaseEnv {
}
@Override
- public void restartDataNode(int index) {
+ public void startConfigNode(int index) {
+ getConfigNodeWrapperList().get(index).start();
+ }
+
+ @Override
+ public void shutdownConfigNode(int index) {
+ getConfigNodeWrapperList().get(index).stop();
+ }
+
+ @Override
+ public void startDataNode(int index) {
getDataNodeWrapperList().get(index).start();
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 0f022ca5dd..e42a578008 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -71,7 +71,7 @@ public interface BaseEnv {
void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList);
- IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException;
+ IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException;
default ISession getSessionConnection() throws IoTDBConnectionException {
return getSessionConnection(
@@ -132,7 +132,11 @@ public interface BaseEnv {
return session;
}
- void restartDataNode(int index);
+ void startConfigNode(int index);
+
+ void shutdownConfigNode(int index);
+
+ void startDataNode(int index);
void shutdownDataNode(int index);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index cb73ebeeb0..d247992a0c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -161,7 +163,7 @@ public class IoTDBClusterPartitionIT {
final String allSg1 = "root.sg1.**";
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TSStatus status;
ByteBuffer buffer;
TSchemaPartitionReq schemaPartitionReq;
@@ -296,7 +298,7 @@ public class IoTDBClusterPartitionIT {
final int timePartitionBatchSize = 10;
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TSStatus status;
TDataPartitionReq dataPartitionReq;
TDataPartitionTableResp dataPartitionTableResp;
@@ -334,7 +336,8 @@ public class IoTDBClusterPartitionIT {
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable
try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient)
+ EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null) {
@@ -401,7 +404,7 @@ public class IoTDBClusterPartitionIT {
EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
final String sg0 = sg + 0;
final String sg1 = sg + 1;
@@ -422,7 +425,7 @@ public class IoTDBClusterPartitionIT {
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable in Win8 environment
try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null) {
break;
@@ -501,7 +504,7 @@ public class IoTDBClusterPartitionIT {
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable in Win8 environment
try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null) {
break;
@@ -542,7 +545,7 @@ public class IoTDBClusterPartitionIT {
// since there exists one DataNode is shutdown
Assert.assertEquals(unknownCnt * 2, runningCnt);
- EnvFactory.getEnv().restartDataNode(testDataNodeId);
+ EnvFactory.getEnv().startDataNode(testDataNodeId);
// Wait for heartbeat check
while (true) {
boolean containUnknown = false;
@@ -596,7 +599,7 @@ public class IoTDBClusterPartitionIT {
final int timePartitionBatchSize = 10;
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
ByteBuffer buffer;
TSchemaPartitionReq schemaPartitionReq;
@@ -632,7 +635,7 @@ public class IoTDBClusterPartitionIT {
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable
try (SyncConfigNodeIServiceClient configNodeClient =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null) {
@@ -752,4 +755,35 @@ public class IoTDBClusterPartitionIT {
Assert.assertEquals(seriesPartitionBatchSize, getSeriesSlotListResp.getSeriesSlotListSize());
}
}
+
+ @Test
+ public void testGetSchemaNodeManagementPartition()
+ throws IOException, TException, IllegalPathException {
+ final String sg = "root.sg";
+ final int storageGroupNum = 2;
+
+ TSStatus status;
+ TSchemaNodeManagementReq nodeManagementReq;
+ TSchemaNodeManagementResp nodeManagementResp;
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // set StorageGroups
+ for (int i = 0; i < storageGroupNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
+ nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
+ nodeManagementReq.setLevel(-1);
+ nodeManagementResp = client.getSchemaNodeManagementPartition(nodeManagementReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
+ Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
+ Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
+ Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java
new file mode 100644
index 0000000000..c9f3057f5d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode;
+
+import org.apache.iotdb.it.env.AbstractEnv;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterRestartIT {
+
+ protected static String originalConfigNodeConsensusProtocolClass;
+ private static final String ratisConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ private static final int testConfigNodeNum = 3;
+ private static final int testDataNodeNum = 3;
+
+ @Before
+ public void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ratisConsensusProtocolClass);
+
+ // Init 3C3D cluster environment
+ EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ }
+
+ @Test
+ public void clusterRestartTest() throws InterruptedException {
+ // Shutdown all cluster nodes
+ for (int i = 0; i < testConfigNodeNum; i++) {
+ EnvFactory.getEnv().shutdownConfigNode(i);
+ }
+ for (int i = 0; i < testDataNodeNum; i++) {
+ EnvFactory.getEnv().shutdownDataNode(i);
+ }
+
+ // Sleep 1s before restart
+ TimeUnit.SECONDS.sleep(1);
+
+ // Restart all cluster nodes
+ for (int i = 0; i < testConfigNodeNum; i++) {
+ EnvFactory.getEnv().startConfigNode(i);
+ }
+ for (int i = 0; i < testDataNodeNum; i++) {
+ EnvFactory.getEnv().startDataNode(i);
+ }
+
+ ((AbstractEnv) EnvFactory.getEnv()).testWorking();
+ }
+
+ // TODO: Add persistence tests in the future
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
index aba7da3fcd..ee1279da99 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
@@ -179,7 +179,7 @@ public class IoTDBConfigNodeIT {
List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// add ConfigNode
for (int i = 0; i < 2; i++) {
ConfigNodeWrapper configNodeWrapper =
@@ -251,7 +251,7 @@ public class IoTDBConfigNodeIT {
List<TDataNodeInfo> dataNodeInfos;
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// add DataNode
DataNodeWrapper dataNodeWrapper =
new DataNodeWrapper(
@@ -375,7 +375,7 @@ public class IoTDBConfigNodeIT {
@Test
public void showClusterAndNodesTest() {
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TShowClusterResp clusterNodes;
TShowConfigNodesResp showConfigNodesResp;
@@ -526,7 +526,7 @@ public class IoTDBConfigNodeIT {
paths.add("root.ln.**");
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
cleanUserAndRole(client);
// create user
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index 07e999db7d..3f67c4b1c7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -126,7 +126,7 @@ public class IoTDBConfigNodeSnapshotIT {
final int timePartitionSlotsNum = 10;
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java
new file mode 100644
index 0000000000..19df2f2a74
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBStorageGroupIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void testSetAndQueryStorageGroup() throws IOException, TException, IllegalPathException {
+ TSStatus status;
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // set StorageGroup0 by default values
+ TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ status = client.setStorageGroup(setReq0);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // set StorageGroup1 by specific values
+ TSetStorageGroupReq setReq1 =
+ new TSetStorageGroupReq(
+ new TStorageGroupSchema(sg1)
+ .setTTL(1024L)
+ .setSchemaReplicationFactor(5)
+ .setDataReplicationFactor(5)
+ .setTimePartitionInterval(2048L));
+ status = client.setStorageGroup(setReq1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // test count all StorageGroups
+ TCountStorageGroupResp countResp =
+ client.countMatchedStorageGroups(Arrays.asList("root", "**"));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
+ Assert.assertEquals(2, countResp.getCount());
+
+ // test count one StorageGroup
+ countResp = client.countMatchedStorageGroups(Arrays.asList("root", "sg0"));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
+ Assert.assertEquals(1, countResp.getCount());
+
+ // test query all StorageGroupSchemas
+ TStorageGroupSchemaResp getResp =
+ client.getMatchedStorageGroupSchemas(Arrays.asList("root", "**"));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
+ Map<String, TStorageGroupSchema> schemaMap = getResp.getStorageGroupSchemaMap();
+ Assert.assertEquals(2, schemaMap.size());
+ TStorageGroupSchema storageGroupSchema = schemaMap.get(sg0);
+ Assert.assertNotNull(storageGroupSchema);
+ Assert.assertEquals(sg0, storageGroupSchema.getName());
+ Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
+ Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
+ Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
+ Assert.assertEquals(604800000, storageGroupSchema.getTimePartitionInterval());
+ storageGroupSchema = schemaMap.get(sg1);
+ Assert.assertNotNull(storageGroupSchema);
+ Assert.assertEquals(sg1, storageGroupSchema.getName());
+ Assert.assertEquals(1024L, storageGroupSchema.getTTL());
+ Assert.assertEquals(5, storageGroupSchema.getSchemaReplicationFactor());
+ Assert.assertEquals(5, storageGroupSchema.getDataReplicationFactor());
+ Assert.assertEquals(2048L, storageGroupSchema.getTimePartitionInterval());
+
+ // test fail by re-register
+ status = client.setStorageGroup(setReq0);
+ Assert.assertEquals(
+ TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
+
+ // test StorageGroup setter interfaces
+ PartialPath patternPath = new PartialPath(sg1);
+ status = client.setTTL(new TSetTTLReq(Arrays.asList(patternPath.getNodes()), Long.MAX_VALUE));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = client.setSchemaReplicationFactor(new TSetSchemaReplicationFactorReq(sg1, 1));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = client.setDataReplicationFactor(new TSetDataReplicationFactorReq(sg1, 1));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = client.setTimePartitionInterval(new TSetTimePartitionIntervalReq(sg1, 604800L));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // test setter results
+ getResp = client.getMatchedStorageGroupSchemas(Arrays.asList("root", "sg1"));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
+ schemaMap = getResp.getStorageGroupSchemaMap();
+ Assert.assertEquals(1, schemaMap.size());
+ storageGroupSchema = schemaMap.get(sg1);
+ Assert.assertNotNull(storageGroupSchema);
+ Assert.assertEquals(sg1, storageGroupSchema.getName());
+ Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
+ Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
+ Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
+ Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
+ }
+ }
+
+ @Test
+ public void testDeleteStorageGroup() throws TException, IOException {
+ TSStatus status;
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ // set StorageGroup0 by default values
+ status = client.setStorageGroup(setReq0);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ // set StorageGroup1 by specific values
+ TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+ status = client.setStorageGroup(setReq1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
+ List<String> sgs = Arrays.asList(sg0, sg1);
+ deleteStorageGroupsReq.setPrefixPathList(sgs);
+ TSStatus deleteSgStatus = client.deleteStorageGroups(deleteStorageGroupsReq);
+ TStorageGroupSchemaResp root =
+ client.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
+ Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index daadfe77b0..1f756954db 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -131,7 +131,7 @@ public class StandaloneEnv implements BaseEnv {
}
@Override
- public IConfigNodeRPCService.Iface getConfigNodeConnection() {
+ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() {
return null;
}
@@ -178,7 +178,17 @@ public class StandaloneEnv implements BaseEnv {
}
@Override
- public void restartDataNode(int index) {
+ public void startConfigNode(int index) {
+ // Do nothing
+ }
+
+ @Override
+ public void shutdownConfigNode(int index) {
+ // Do nothing
+ }
+
+ @Override
+ public void startDataNode(int index) {
// Do nothing
}