You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/13 13:51:06 UTC

[iotdb] branch master updated: [IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c42312c9 [IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)
07c42312c9 is described below

commit 07c42312c9d324d5b55e3ca345ee4d69479c1afe
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Dec 13 21:50:59 2022 +0800

    [IOTDB-5153] add snapshot transfer IT & UT for RatisConsensus (#8385)
    
    * add snapshot IT & UT
    
    * temp save
    
    * add confignode
---
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  13 ++
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |  32 +++++
 .../java/org/apache/iotdb/it/env/MppConfig.java    |   9 +-
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |   5 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +-
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   2 +
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   7 +-
 .../confignode/it/IoTDBSnapshotTransferIT.java     | 150 +++++++++++++++++++++
 .../org/apache/iotdb/db/it/env/StandaloneEnv.java  |   5 +
 9 files changed, 225 insertions(+), 6 deletions(-)

diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 8bba1a7a13..a519bf29ef 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -238,6 +238,19 @@ public class RatisConsensusTest {
     Assert.assertEquals((leaderIndex + 1) % 3, newLeaderIndex);
   }
 
+  @Test
+  public void transferSnapshot() throws Exception {
+    servers.get(0).createPeer(gid, peers.subList(0, 1));
+
+    doConsensus(servers.get(0), gid, 10, 10);
+    Assert.assertTrue(servers.get(0).triggerSnapshot(gid).isSuccess());
+
+    servers.get(1).createPeer(gid, Collections.emptyList());
+    servers.get(0).addPeer(gid, peers.get(1));
+
+    doConsensus(servers.get(1), gid, 10, 20);
+  }
+
   private void doConsensus(IConsensus consensus, ConsensusGroupId gid, int count, int target)
       throws Exception {
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 147bff2b27..86cc649615 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -550,6 +550,38 @@ public abstract class AbstractEnv implements BaseEnv {
     testWorking();
   }
 
+  @Override
+  public void registerNewConfigNode() {
+    final ConfigNodeWrapper newConfigNodeWrapper =
+        new ConfigNodeWrapper(
+            false,
+            configNodeWrapperList.get(0).getIpAndPortString(),
+            getTestClassName(),
+            getTestMethodName(),
+            EnvUtils.searchAvailablePorts());
+    configNodeWrapperList.add(newConfigNodeWrapper);
+    newConfigNodeWrapper.createDir();
+    newConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+
+    // Start new ConfigNode
+    RequestDelegate<Void> configNodeDelegate =
+        new ParallelRequestDelegate<>(
+            Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
+            NODE_START_TIMEOUT);
+    configNodeDelegate.addRequest(
+        () -> {
+          newConfigNodeWrapper.start();
+          return null;
+        });
+
+    try {
+      configNodeDelegate.requestAll();
+    } catch (SQLException e) {
+      logger.error("Start configNode failed", e);
+      fail();
+    }
+  }
+
   @Override
   public void startDataNode(int index) {
     dataNodeWrapperList.get(index).start();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index df5535bdf8..098d70faa4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -291,7 +291,7 @@ public class MppConfig implements BaseConfig {
   }
 
   @Override
-  public BaseConfig setRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
+  public BaseConfig setConfigNodeRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
     confignodeProperties.setProperty(
         "config_node_ratis_snapshot_trigger_threshold",
         String.valueOf(ratisSnapshotTriggerThreshold));
@@ -388,4 +388,11 @@ public class MppConfig implements BaseConfig {
     confignodeProperties.setProperty("query_thread_count", String.valueOf(queryThreadCount));
     return this;
   }
+
+  @Override
+  public BaseConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
+    confignodeProperties.setProperty(
+        "data_region_ratis_snapshot_trigger_threshold", String.valueOf(threshold));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index bb3b1340e2..ba62c5e0c4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -194,6 +194,11 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void registerNewConfigNode() {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void startDataNode(int index) {
     throw new UnsupportedOperationException();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 31def0bc8b..2085853687 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -295,11 +295,11 @@ public interface BaseConfig {
     return 86400;
   }
 
-  default BaseConfig setRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
+  default BaseConfig setConfigNodeRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
     return this;
   }
 
-  default int getRatisSnapshotTriggerThreshold() {
+  default int getConfigNodeRatisSnapshotTriggerThreshold() {
     return 400000;
   }
 
@@ -412,4 +412,8 @@ public interface BaseConfig {
   default int getQueryThreadCount() {
     return Runtime.getRuntime().availableProcessors();
   }
+
+  default BaseConfig setDataRatisTriggerSnapshotThreshold(long threshold) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 04ae16ba74..ad57243849 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -201,6 +201,8 @@ public interface BaseEnv {
   /** Register a new DataNode */
   void registerNewDataNode();
 
+  void registerNewConfigNode();
+
   /** Start an existed DataNode */
   void startDataNode(int index);
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index d8d4f5d071..dbff815988 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -95,8 +95,9 @@ public class IoTDBConfigNodeSnapshotIT {
     ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
 
     originalRatisSnapshotTriggerThreshold =
-        ConfigFactory.getConfig().getRatisSnapshotTriggerThreshold();
-    ConfigFactory.getConfig().setRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
+        ConfigFactory.getConfig().getConfigNodeRatisSnapshotTriggerThreshold();
+    ConfigFactory.getConfig()
+        .setConfigNodeRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
 
     originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
     ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
@@ -112,7 +113,7 @@ public class IoTDBConfigNodeSnapshotIT {
     ConfigFactory.getConfig()
         .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
     ConfigFactory.getConfig()
-        .setRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
+        .setConfigNodeRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
     ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
   }
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
new file mode 100644
index 0000000000..07229c715f
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.Optional;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBSnapshotTransferIT {
+
+  private final long snapshotMagic = 99;
+
+  @Before
+  public void setUp() throws Exception {
+    ConfigFactory.getConfig()
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setConfigNodeConsesusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataReplicationFactor(2)
+        .setSchemaReplicationFactor(2)
+        .setDataRatisTriggerSnapshotThreshold(snapshotMagic)
+        .setConfigNodeRatisSnapshotTriggerThreshold(1); // must trigger snapshot
+
+    EnvFactory.getEnv().initClusterEnvironment(3, 3);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testSnapshotTransfer() throws Exception {
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement();
+        final SyncConfigNodeIServiceClient configClient =
+            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      statement.execute("CREATE DATABASE root.emma");
+      statement.execute(
+          "CREATE TIMESERIES root.emma.william.ethereal WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+      // insert large amounts of data to trigger a snapshot
+      for (int i = 0; i < snapshotMagic + 1; i++) {
+        final String insert =
+            String.format(
+                "INSERT INTO root.emma.william(timestamp, ethereal) values(%d, %d)", i, i);
+        statement.execute(insert);
+      }
+
+      final TShowRegionResp result = configClient.showRegion(new TShowRegionReq());
+      Assert.assertNotNull(result.getRegionInfoList());
+
+      final Optional<TRegionInfo> dataRegionInfo =
+          result.getRegionInfoList().stream()
+              .filter(
+                  info ->
+                      info.getConsensusGroupId().getType().equals(TConsensusGroupType.DataRegion))
+              .findAny();
+      Assert.assertTrue(dataRegionInfo.isPresent());
+
+      final int dataNodeId = dataRegionInfo.get().getDataNodeId();
+
+      final TDataNodeConfigurationResp locationResp =
+          configClient.getDataNodeConfiguration(dataNodeId);
+      Assert.assertNotNull(locationResp.getDataNodeConfigurationMap());
+
+      // remove this datanode and the regions will be transferred to the other one
+      final TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq();
+      final TDataNodeLocation location =
+          locationResp.getDataNodeConfigurationMap().get(dataNodeId).getLocation();
+
+      removeReq.setDataNodeLocations(Collections.singletonList(location));
+      final TDataNodeRemoveResp removeResp = configClient.removeDataNode(removeReq);
+      Assert.assertEquals(
+          removeResp.getStatus().getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+      // insert new data to ensure that the data regions are properly functioning
+      for (int i = 0; i < snapshotMagic + 1; i++) {
+        final String insert =
+            String.format(
+                "INSERT INTO root.emma.william(timestamp, ethereal) values (%d, %d)",
+                i + snapshotMagic * 2, i + snapshotMagic * 2);
+        statement.execute(insert);
+      }
+
+      final ResultSet readResult = statement.executeQuery("SELECT count(*) from root.emma.william");
+      readResult.next();
+      Assert.assertEquals(
+          2 * (snapshotMagic + 1), readResult.getInt("count(root.emma.william.ethereal)"));
+
+      EnvFactory.getEnv().registerNewConfigNode();
+      final TShowRegionResp registerResult = configClient.showRegion(new TShowRegionReq());
+      Assert.assertNotNull(result.getRegionInfoList());
+
+      final long configNodeGroupCount =
+          registerResult.getRegionInfoList().stream()
+              .filter(
+                  info ->
+                      info.getConsensusGroupId()
+                          .getType()
+                          .equals(TConsensusGroupType.ConfigNodeRegion))
+              .count();
+      Assert.assertEquals(4, configNodeGroupCount);
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 608961bb57..ac2ca63976 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -203,6 +203,11 @@ public class StandaloneEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void registerNewConfigNode() {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void startDataNode(int index) {
     throw new UnsupportedOperationException();