You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/12 08:44:17 UTC

[iotdb] 01/01: [IOTDB-5400] IoTConsensus loses data when replica number change from 1 to several (#8837)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5400_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4c6bfa96cddfd20eaef4491ddb74c3c03a37d317
Author: Potato <ta...@apache.org>
AuthorDate: Thu Jan 12 16:31:50 2023 +0800

    [IOTDB-5400] IoTConsensus loses data when replica number change from 1 to several (#8837)
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  1 +
 .../consensus/iot/IoTConsensusServerImpl.java      | 32 ++++++++++++++--------
 .../consensus/iot/IoTConsensusServerMetrics.java   |  2 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |  2 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |  2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 32 +++++++++++-----------
 6 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..5cfe53459d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus {
 
       // step 2: notify all the other Peers to build the sync connection to newPeer
       logger.info("[IoTConsensus] notify current peers to build sync log...");
+      impl.checkAndLockSafeDeletedSearchIndex();
       impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 3: take snapshot
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..298a8caa12 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
   private final List<Peer> configuration;
-  private final AtomicLong index;
+  private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
@@ -136,7 +136,7 @@ public class IoTConsensusServerImpl {
       // only one configuration means single replica.
       reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     }
-    this.index = new AtomicLong(currentSearchIndex);
+    this.searchIndex = new AtomicLong(currentSearchIndex);
     this.consensusGroupId = thisNode.getGroupId().toString();
     this.metrics = new IoTConsensusServerMetrics(this);
   }
@@ -180,7 +180,7 @@ public class IoTConsensusServerImpl {
       if (needBlockWrite()) {
         logger.info(
             "[Throttle Down] index:{}, safeIndex:{}",
-            getIndex(),
+            getSearchIndex(),
             getCurrentSafelyDeletedSearchIndex());
         try {
           boolean timeout =
@@ -240,9 +240,9 @@ public class IoTConsensusServerImpl {
         // is not expected and will slow down the preparation speed for batch.
         // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
         // in one transaction.
-        synchronized (index) {
+        synchronized (searchIndex) {
           logDispatcher.offer(indexedConsensusRequest);
-          index.incrementAndGet();
+          searchIndex.incrementAndGet();
         }
         // statistic the time of offering request into queue
         MetricService.getInstance()
@@ -456,7 +456,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer, index.get());
+        buildSyncLogChannel(targetPeer, searchIndex.get());
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -668,7 +668,7 @@ public class IoTConsensusServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
+    return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -682,7 +682,7 @@ public class IoTConsensusServerImpl {
    * single copies, the current index is selected
    */
   public long getCurrentSafelyDeletedSearchIndex() {
-    return logDispatcher.getMinSyncIndex().orElseGet(index::get);
+    return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
   }
 
   public String getStorageDir() {
@@ -697,8 +697,8 @@ public class IoTConsensusServerImpl {
     return configuration;
   }
 
-  public long getIndex() {
-    return index.get();
+  public long getSearchIndex() {
+    return searchIndex.get();
   }
 
   public IoTConsensusConfig getConfig() {
@@ -723,7 +723,7 @@ public class IoTConsensusServerImpl {
   }
 
   public AtomicLong getIndexObject() {
-    return index;
+    return searchIndex;
   }
 
   public boolean isReadOnly() {
@@ -768,4 +768,14 @@ public class IoTConsensusServerImpl {
       }
     }
   }
+
+  /**
+   * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
+   * lost.
+   */
+  public void checkAndLockSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(searchIndex.get());
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 88b25486b7..1e36e20b70 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
             Metric.IOT_CONSENSUS.toString(),
             MetricLevel.IMPORTANT,
             impl,
-            IoTConsensusServerImpl::getIndex,
+            IoTConsensusServerImpl::getSearchIndex,
             Tag.NAME.toString(),
             "ioTConsensusServerImpl",
             Tag.REGION.toString(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 752a392ed1..93de711b76 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex;
       synchronized (impl.getIndexObject()) {
-        maxIndex = impl.getIndex() + 1;
+        maxIndex = impl.getSearchIndex() + 1;
         logger.debug(
             "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
             impl.getThisNode().getGroupId(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f966528cdc..6b24f596a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
       resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
       return;
     }
-    long searchIndex = impl.getIndex();
+    long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
     resultHandler.onComplete(
         new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index e1675e7994..a36db6e3ea 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -120,17 +120,17 @@ public class ReplicateTest {
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
     servers.get(2).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
     }
 
     for (int i = 0; i < 3; i++) {
@@ -163,9 +163,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
@@ -197,14 +197,14 @@ public class ReplicateTest {
     servers.get(0).createPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
     }
 
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -219,9 +219,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();