You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/13 13:51:06 UTC
[iotdb] branch master updated: [IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 07c42312c9 [IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)
07c42312c9 is described below
commit 07c42312c9d324d5b55e3ca345ee4d69479c1afe
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Dec 13 21:50:59 2022 +0800
[IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)
* add snapshot IT & UT
* temp save
* add confignode
---
.../iotdb/consensus/ratis/RatisConsensusTest.java | 13 ++
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 32 +++++
.../java/org/apache/iotdb/it/env/MppConfig.java | 9 +-
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 5 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +-
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 +
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 7 +-
.../confignode/it/IoTDBSnapshotTransferIT.java | 150 +++++++++++++++++++++
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 5 +
9 files changed, 225 insertions(+), 6 deletions(-)
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 8bba1a7a13..a519bf29ef 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -238,6 +238,19 @@ public class RatisConsensusTest {
Assert.assertEquals((leaderIndex + 1) % 3, newLeaderIndex);
}
+ @Test
+ public void transferSnapshot() throws Exception {
+ servers.get(0).createPeer(gid, peers.subList(0, 1));
+
+ doConsensus(servers.get(0), gid, 10, 10);
+ Assert.assertTrue(servers.get(0).triggerSnapshot(gid).isSuccess());
+
+ servers.get(1).createPeer(gid, Collections.emptyList());
+ servers.get(0).addPeer(gid, peers.get(1));
+
+ doConsensus(servers.get(1), gid, 10, 20);
+ }
+
private void doConsensus(IConsensus consensus, ConsensusGroupId gid, int count, int target)
throws Exception {
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 147bff2b27..86cc649615 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -550,6 +550,38 @@ public abstract class AbstractEnv implements BaseEnv {
testWorking();
}
+ @Override
+ public void registerNewConfigNode() {
+ final ConfigNodeWrapper newConfigNodeWrapper =
+ new ConfigNodeWrapper(
+ false,
+ configNodeWrapperList.get(0).getIpAndPortString(),
+ getTestClassName(),
+ getTestMethodName(),
+ EnvUtils.searchAvailablePorts());
+ configNodeWrapperList.add(newConfigNodeWrapper);
+ newConfigNodeWrapper.createDir();
+ newConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+
+ // Start new ConfigNode
+ RequestDelegate<Void> configNodeDelegate =
+ new ParallelRequestDelegate<>(
+ Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
+ NODE_START_TIMEOUT);
+ configNodeDelegate.addRequest(
+ () -> {
+ newConfigNodeWrapper.start();
+ return null;
+ });
+
+ try {
+ configNodeDelegate.requestAll();
+ } catch (SQLException e) {
+ logger.error("Start configNode failed", e);
+ fail();
+ }
+ }
+
@Override
public void startDataNode(int index) {
dataNodeWrapperList.get(index).start();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index df5535bdf8..098d70faa4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -291,7 +291,7 @@ public class MppConfig implements BaseConfig {
}
@Override
- public BaseConfig setRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
+ public BaseConfig setConfigNodeRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
confignodeProperties.setProperty(
"config_node_ratis_snapshot_trigger_threshold",
String.valueOf(ratisSnapshotTriggerThreshold));
@@ -388,4 +388,11 @@ public class MppConfig implements BaseConfig {
confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
return this;
}
+
+ @Override
+ public BaseConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
+ confignodeProperties.setProperty(
+ "data_region_ratis_snapshot_trigger_threshold", String.valueOf(threshold));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index bb3b1340e2..ba62c5e0c4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -194,6 +194,11 @@ public class RemoteServerEnv implements BaseEnv {
throw new UnsupportedOperationException();
}
+ @Override
+ public void registerNewConfigNode() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void startDataNode(int index) {
throw new UnsupportedOperationException();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 31def0bc8b..2085853687 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -295,11 +295,11 @@ public interface BaseConfig {
return 86400;
}
- default BaseConfig setRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
+ default BaseConfig setConfigNodeRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
return this;
}
- default int getRatisSnapshotTriggerThreshold() {
+ default int getConfigNodeRatisSnapshotTriggerThreshold() {
return 400000;
}
@@ -412,4 +412,8 @@ public interface BaseConfig {
default int getQueryThreadCount() {
return Runtime.getRuntime().availableProcessors();
}
+
+ default BaseConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 04ae16ba74..ad57243849 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -201,6 +201,8 @@ public interface BaseEnv {
/** Register a new DataNode */
void registerNewDataNode();
+ void registerNewConfigNode();
+
/** Start an existed DataNode */
void startDataNode(int index);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index d8d4f5d071..dbff815988 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -95,8 +95,9 @@ public class IoTDBConfigNodeSnapshotIT {
ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
originalRatisSnapshotTriggerThreshold =
- ConfigFactory.getConfig().getRatisSnapshotTriggerThreshold();
- ConfigFactory.getConfig().setRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
+ ConfigFactory.getConfig().getConfigNodeRatisSnapshotTriggerThreshold();
+ ConfigFactory.getConfig()
+ .setConfigNodeRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
@@ -112,7 +113,7 @@ public class IoTDBConfigNodeSnapshotIT {
ConfigFactory.getConfig()
.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
ConfigFactory.getConfig()
- .setRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
+ .setConfigNodeRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
new file mode 100644
index 0000000000..07229c715f
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.Optional;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBSnapshotTransferIT {
+
+ private final long snapshotMagic = 99;
+
+ @Before
+ public void setUp() throws Exception {
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataReplicationFactor(2)
+ .setSchemaReplicationFactor(2)
+ .setDataRatisTriggerSnapshotThreshold(snapshotMagic)
+ .setConfigNodeRatisSnapshotTriggerThreshold(1); // must trigger snapshot
+
+ EnvFactory.getEnv().initClusterEnvironment(3, 3);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void testSnapshotTransfer() throws Exception {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement();
+ final SyncConfigNodeIServiceClient configClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ statement.execute("CREATE DATABASE root.emma");
+ statement.execute(
+ "CREATE TIMESERIES root.emma.william.ethereal WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+ // insert large amounts of data to trigger a snapshot
+ for (int i = 0; i < snapshotMagic + 1; i++) {
+ final String insert =
+ String.format(
+ "INSERT INTO root.emma.william(timestamp, ethereal) values(%d, %d)", i, i);
+ statement.execute(insert);
+ }
+
+ final TShowRegionResp result = configClient.showRegion(new TShowRegionReq());
+ Assert.assertNotNull(result.getRegionInfoList());
+
+ final Optional<TRegionInfo> dataRegionInfo =
+ result.getRegionInfoList().stream()
+ .filter(
+ info ->
+ info.getConsensusGroupId().getType().equals(TConsensusGroupType.DataRegion))
+ .findAny();
+ Assert.assertTrue(dataRegionInfo.isPresent());
+
+ final int dataNodeId = dataRegionInfo.get().getDataNodeId();
+
+ final TDataNodeConfigurationResp locationResp =
+ configClient.getDataNodeConfiguration(dataNodeId);
+ Assert.assertNotNull(locationResp.getDataNodeConfigurationMap());
+
+ // remove this datanode and the regions will be transferred to the other one
+ final TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq();
+ final TDataNodeLocation location =
+ locationResp.getDataNodeConfigurationMap().get(dataNodeId).getLocation();
+
+ removeReq.setDataNodeLocations(Collections.singletonList(location));
+ final TDataNodeRemoveResp removeResp = configClient.removeDataNode(removeReq);
+ Assert.assertEquals(
+ removeResp.getStatus().getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+ // insert new data to ensure that the data regions are properly functioning
+ for (int i = 0; i < snapshotMagic + 1; i++) {
+ final String insert =
+ String.format(
+ "INSERT INTO root.emma.william(timestamp, ethereal) values (%d, %d)",
+ i + snapshotMagic * 2, i + snapshotMagic * 2);
+ statement.execute(insert);
+ }
+
+ final ResultSet readResult = statement.executeQuery("SELECT count(*) from root.emma.william");
+ readResult.next();
+ Assert.assertEquals(
+ 2 * (snapshotMagic + 1), readResult.getInt("count(root.emma.william.ethereal)"));
+
+ EnvFactory.getEnv().registerNewConfigNode();
+ final TShowRegionResp registerResult = configClient.showRegion(new TShowRegionReq());
+ Assert.assertNotNull(result.getRegionInfoList());
+
+ final long configNodeGroupCount =
+ registerResult.getRegionInfoList().stream()
+ .filter(
+ info ->
+ info.getConsensusGroupId()
+ .getType()
+ .equals(TConsensusGroupType.ConfigNodeRegion))
+ .count();
+ Assert.assertEquals(4, configNodeGroupCount);
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 608961bb57..ac2ca63976 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -203,6 +203,11 @@ public class StandaloneEnv implements BaseEnv {
throw new UnsupportedOperationException();
}
+ @Override
+ public void registerNewConfigNode() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void startDataNode(int index) {
throw new UnsupportedOperationException();