You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/04 09:55:32 UTC

[iotdb] branch cluster_divide_condition updated: divide log update condition into multiple instances

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_divide_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_divide_condition by this push:
     new f7856f1  divide log update condition into multiple instances
f7856f1 is described below

commit f7856f14289ca6f69ad77ab7ada87607054e8a9c
Author: jt <jt...@163.com>
AuthorDate: Fri Dec 4 17:54:02 2020 +0800

    divide log update condition into multiple instances
---
 .../iotdb/cluster/log/manage/RaftLogManager.java       | 18 +++++++++++++-----
 .../apache/iotdb/cluster/server/member/RaftMember.java |  4 ++--
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..28fd182 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -114,7 +114,7 @@ public abstract class RaftLogManager {
    * Each time new logs are appended, this condition will be notified so logs that have larger
    * indices but arrived earlier can proceed.
    */
-  private final Object logUpdateCondition = new Object();
+  private final Object[] logUpdateConditions = new Object[100];
 
   private List<Log> blockedUnappliedLogList;
 
@@ -176,6 +176,10 @@ public abstract class RaftLogManager {
     if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
       this.applyAllCommittedLogWhenStartUp();
     }
+
+    for (int i = 0; i < logUpdateConditions.length; i++) {
+      logUpdateConditions[i] = new Object();
+    }
   }
 
   public Snapshot getSnapshot() {
@@ -426,8 +430,11 @@ public abstract class RaftLogManager {
       return -1;
     }
     getUnCommittedEntryManager().truncateAndAppend(entries);
-    synchronized (logUpdateCondition) {
-      logUpdateCondition.notifyAll();
+    for (Log entry : entries) {
+      Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
+      synchronized (logUpdateCondition) {
+        logUpdateCondition.notifyAll();
+      }
     }
     return getLastLogIndex();
   }
@@ -446,6 +453,7 @@ public abstract class RaftLogManager {
       return -1;
     }
     getUnCommittedEntryManager().truncateAndAppend(entry);
+    Object logUpdateCondition = getLogUpdateCondition(after);
     synchronized (logUpdateCondition) {
       logUpdateCondition.notifyAll();
     }
@@ -835,8 +843,8 @@ public abstract class RaftLogManager {
   }
 
 
-  public Object getLogUpdateCondition() {
-    return logUpdateCondition;
+  public Object getLogUpdateCondition(long index) {
+    return logUpdateConditions[(int) (index % logUpdateConditions.length)];
   }
 
   void applyAllCommittedLogWhenStartUp() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 25201e9..722b185 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1756,7 +1756,7 @@ public abstract class RaftMember {
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
-    Object logUpdateCondition = logManager.getLogUpdateCondition();
+    Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
     while (logManager.getLastLogIndex() < prevLogIndex &&
         alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
       try {
@@ -1766,7 +1766,7 @@ public abstract class RaftMember {
           return true;
         }
         synchronized (logUpdateCondition) {
-          logUpdateCondition.wait(1);
+          logUpdateCondition.wait(prevLogIndex - lastLogIndex);
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();