You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/11 11:21:55 UTC

[iotdb] branch jira5400 created (now 64dc4cdd63)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5400
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 64dc4cdd63 finish

This branch includes the following new commits:

     new 64dc4cdd63 finish

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5400
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64dc4cdd6361426688dab297b9f96f6389a51243
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Wed Jan 11 19:21:30 2023 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  1 +
 .../consensus/iot/IoTConsensusServerImpl.java      | 39 ++++++++++++++--------
 .../consensus/iot/IoTConsensusServerMetrics.java   |  2 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |  2 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |  2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 32 +++++++++---------
 6 files changed, 46 insertions(+), 32 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..5cfe53459d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus {
 
       // step 2: notify all the other Peers to build the sync connection to newPeer
       logger.info("[IoTConsensus] notify current peers to build sync log...");
+      impl.checkAndLockSafeDeletedSearchIndex();
       impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 3: take snapshot
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..c4f741ac91 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
   private final List<Peer> configuration;
-  private final AtomicLong index;
+  private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
@@ -136,7 +136,7 @@ public class IoTConsensusServerImpl {
       // only one configuration means single replica.
       reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
     }
-    this.index = new AtomicLong(currentSearchIndex);
+    this.searchIndex = new AtomicLong(currentSearchIndex);
     this.consensusGroupId = thisNode.getGroupId().toString();
     this.metrics = new IoTConsensusServerMetrics(this);
   }
@@ -158,7 +158,8 @@ public class IoTConsensusServerImpl {
   }
 
   /**
-   * records the index of the log and writes locally, and then asynchronous replication is performed
+   * records the index of the log and writes locally, and then asynchronous replication is
+   * performed
    */
   public TSStatus write(IConsensusRequest request) {
     long consensusWriteStartTime = System.currentTimeMillis();
@@ -180,7 +181,7 @@ public class IoTConsensusServerImpl {
       if (needBlockWrite()) {
         logger.info(
             "[Throttle Down] index:{}, safeIndex:{}",
-            getIndex(),
+            getSearchIndex(),
             getCurrentSafelyDeletedSearchIndex());
         try {
           boolean timeout =
@@ -240,9 +241,9 @@ public class IoTConsensusServerImpl {
         // is not expected and will slow down the preparation speed for batch.
         // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
         // in one transaction.
-        synchronized (index) {
+        synchronized (searchIndex) {
           logDispatcher.offer(indexedConsensusRequest);
-          index.incrementAndGet();
+          searchIndex.incrementAndGet();
         }
         // statistic the time of offering request into queue
         MetricService.getInstance()
@@ -456,7 +457,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer, index.get());
+        buildSyncLogChannel(targetPeer, searchIndex.get());
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -565,7 +566,9 @@ public class IoTConsensusServerImpl {
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
 
-  /** build SyncLog channel with safeIndex as the default initial sync index */
+  /**
+   * build SyncLog channel with safeIndex as the default initial sync index
+   */
   public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
   }
@@ -668,7 +671,7 @@ public class IoTConsensusServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
+    return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -682,7 +685,7 @@ public class IoTConsensusServerImpl {
    * single copies, the current index is selected
    */
   public long getCurrentSafelyDeletedSearchIndex() {
-    return logDispatcher.getMinSyncIndex().orElseGet(index::get);
+    return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
   }
 
   public String getStorageDir() {
@@ -697,8 +700,8 @@ public class IoTConsensusServerImpl {
     return configuration;
   }
 
-  public long getIndex() {
-    return index.get();
+  public long getSearchIndex() {
+    return searchIndex.get();
   }
 
   public IoTConsensusConfig getConfig() {
@@ -723,7 +726,7 @@ public class IoTConsensusServerImpl {
   }
 
   public AtomicLong getIndexObject() {
-    return index;
+    return searchIndex;
   }
 
   public boolean isReadOnly() {
@@ -768,4 +771,14 @@ public class IoTConsensusServerImpl {
       }
     }
   }
+
+  /**
+   * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
+   * lost
+   */
+  public void checkAndLockSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(searchIndex.get());
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 88b25486b7..1e36e20b70 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
             Metric.IOT_CONSENSUS.toString(),
             MetricLevel.IMPORTANT,
             impl,
-            IoTConsensusServerImpl::getIndex,
+            IoTConsensusServerImpl::getSearchIndex,
             Tag.NAME.toString(),
             "ioTConsensusServerImpl",
             Tag.REGION.toString(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 752a392ed1..93de711b76 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex;
       synchronized (impl.getIndexObject()) {
-        maxIndex = impl.getIndex() + 1;
+        maxIndex = impl.getSearchIndex() + 1;
         logger.debug(
             "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
             impl.getThisNode().getGroupId(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f966528cdc..6b24f596a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
       resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
       return;
     }
-    long searchIndex = impl.getIndex();
+    long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
     resultHandler.onComplete(
         new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index e1675e7994..a36db6e3ea 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -120,17 +120,17 @@ public class ReplicateTest {
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
     servers.get(2).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
     }
 
     for (int i = 0; i < 3; i++) {
@@ -163,9 +163,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
@@ -197,14 +197,14 @@ public class ReplicateTest {
     servers.get(0).createPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
     }
 
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -219,9 +219,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();