You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/31 03:15:58 UTC

[iotdb] 01/01: split condition for waitPrevLog into multiple conditions

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_split_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9da7b7290f433d63032297bf3704916d5498ddfd
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 31 11:12:55 2020 +0800

    split condition for waitPrevLog into multiple conditions
---
 .../cluster/log/manage/CommittedEntryManager.java  |  5 +--
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 15 ++++++---
 .../log/manage/UnCommittedEntryManager.java        | 10 ++++--
 .../iotdb/cluster/server/member/RaftMember.java    | 38 +++++++++++++---------
 4 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index 259b3e5..e65be0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -98,8 +98,9 @@ public class CommittedEntryManager {
    *
    * @return entries's size
    */
-  long getTotalSize() {
-    return getLastIndex() - getFirstIndex() + 1;
+  int getTotalSize() {
+    // the first one is a sentry
+    return entries.size() - 1;
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..1c34f70 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -114,7 +114,7 @@ public abstract class RaftLogManager {
    * Each time new logs are appended, this condition will be notified so logs that have larger
    * indices but arrived earlier can proceed.
    */
-  private final Object logUpdateCondition = new Object();
+  private final Object[] logUpdateConditions = new Object[1024];
 
   private List<Log> blockedUnappliedLogList;
 
@@ -176,6 +176,10 @@ public abstract class RaftLogManager {
     if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
       this.applyAllCommittedLogWhenStartUp();
     }
+
+    for (int i = 0; i < logUpdateConditions.length; i++) {
+      logUpdateConditions[i] = new Object();
+    }
   }
 
   public Snapshot getSnapshot() {
@@ -426,6 +430,8 @@ public abstract class RaftLogManager {
       return -1;
     }
     getUnCommittedEntryManager().truncateAndAppend(entries);
+    Object logUpdateCondition = getLogUpdateCondition(
+        entries.get(entries.size() - 1).getCurrLogIndex());
     synchronized (logUpdateCondition) {
       logUpdateCondition.notifyAll();
     }
@@ -446,6 +452,7 @@ public abstract class RaftLogManager {
       return -1;
     }
     getUnCommittedEntryManager().truncateAndAppend(entry);
+    Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
     synchronized (logUpdateCondition) {
       logUpdateCondition.notifyAll();
     }
@@ -568,7 +575,7 @@ public abstract class RaftLogManager {
           .clear();
     }
     try {
-      int currentSize = (int) committedEntryManager.getTotalSize();
+      int currentSize = committedEntryManager.getTotalSize();
       int deltaSize = entries.size();
       if (currentSize + deltaSize > maxNumOfLogsInMem) {
         int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize;
@@ -835,8 +842,8 @@ public abstract class RaftLogManager {
   }
 
 
-  public Object getLogUpdateCondition() {
-    return logUpdateCondition;
+  public Object getLogUpdateCondition(long logIndex) {
+    return logUpdateConditions[(int) (logIndex % logUpdateConditions.length)];
   }
 
   void applyAllCommittedLogWhenStartUp() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 07ae450..d83aed7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -59,8 +59,14 @@ public class UnCommittedEntryManager {
    */
   long maybeLastIndex() {
     int entryNum = entries.size();
-    if (entryNum != 0) {
-      return offset + entryNum - 1;
+    while (entryNum != 0) {
+      try {
+        return entries.get(entryNum - 1).getCurrLogIndex();
+      } catch (IndexOutOfBoundsException e) {
+        // the exception is thrown when there is a concurrent deletion, which is rare, so a retry
+        // is usually enough and it is not likely that we will get stuck here
+        entryNum = entries.size();
+      }
     }
     return -1;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d781669..4e7442c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -57,12 +57,16 @@ import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -1738,17 +1742,18 @@ public abstract class RaftMember {
       return resp;
     }
 
+    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    long success;
     synchronized (logManager) {
-      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
-      long success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
-      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
-      if (success != -1) {
-        logger.debug("{} append a new log {}", name, log);
-        resp = Response.RESPONSE_AGREE;
-      } else {
-        // the incoming log points to an illegal position, reject it
-        resp = Response.RESPONSE_LOG_MISMATCH;
-      }
+      success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+    }
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+    if (success != -1) {
+      logger.debug("{} append a new log {}", name, log);
+      resp = Response.RESPONSE_AGREE;
+    } else {
+      // the incoming log points to an illegal position, reject it
+      resp = Response.RESPONSE_LOG_MISMATCH;
     }
     return resp;
   }
@@ -1759,18 +1764,19 @@ public abstract class RaftMember {
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
-    Object logUpdateCondition = logManager.getLogUpdateCondition();
-    while (logManager.getLastLogIndex() < prevLogIndex &&
+    Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
+    long lastLogIndex = logManager.getLastLogIndex();
+    while (lastLogIndex < prevLogIndex &&
         alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
       try {
         // each time new logs are appended, this will be notified
-        long lastLogIndex = logManager.getLastLogIndex();
-        if (lastLogIndex >= prevLogIndex) {
-          return true;
-        }
         synchronized (logUpdateCondition) {
           logUpdateCondition.wait(1);
         }
+        lastLogIndex = logManager.getLastLogIndex();
+        if (lastLogIndex >= prevLogIndex) {
+          return true;
+        }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         return false;