You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/04 09:55:32 UTC
[iotdb] branch cluster_divide_condition updated: divide log update
condition into multiple instances
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_divide_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_divide_condition by this push:
new f7856f1 divide log update condition into multiple instances
f7856f1 is described below
commit f7856f14289ca6f69ad77ab7ada87607054e8a9c
Author: jt <jt...@163.com>
AuthorDate: Fri Dec 4 17:54:02 2020 +0800
divide log update condition into multiple instances
---
.../iotdb/cluster/log/manage/RaftLogManager.java | 18 +++++++++++++-----
.../apache/iotdb/cluster/server/member/RaftMember.java | 4 ++--
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..28fd182 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -114,7 +114,7 @@ public abstract class RaftLogManager {
* Each time new logs are appended, this condition will be notified so logs that have larger
* indices but arrived earlier can proceed.
*/
- private final Object logUpdateCondition = new Object();
+ private final Object[] logUpdateConditions = new Object[100];
private List<Log> blockedUnappliedLogList;
@@ -176,6 +176,10 @@ public abstract class RaftLogManager {
if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
this.applyAllCommittedLogWhenStartUp();
}
+
+ for (int i = 0; i < logUpdateConditions.length; i++) {
+ logUpdateConditions[i] = new Object();
+ }
}
public Snapshot getSnapshot() {
@@ -426,8 +430,11 @@ public abstract class RaftLogManager {
return -1;
}
getUnCommittedEntryManager().truncateAndAppend(entries);
- synchronized (logUpdateCondition) {
- logUpdateCondition.notifyAll();
+ for (Log entry : entries) {
+ Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
+ synchronized (logUpdateCondition) {
+ logUpdateCondition.notifyAll();
+ }
}
return getLastLogIndex();
}
@@ -446,6 +453,7 @@ public abstract class RaftLogManager {
return -1;
}
getUnCommittedEntryManager().truncateAndAppend(entry);
+ Object logUpdateCondition = getLogUpdateCondition(after);
synchronized (logUpdateCondition) {
logUpdateCondition.notifyAll();
}
@@ -835,8 +843,8 @@ public abstract class RaftLogManager {
}
- public Object getLogUpdateCondition() {
- return logUpdateCondition;
+ public Object getLogUpdateCondition(long index) {
+ return logUpdateConditions[(int) (index % logUpdateConditions.length)];
}
void applyAllCommittedLogWhenStartUp() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 25201e9..722b185 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1756,7 +1756,7 @@ public abstract class RaftMember {
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- Object logUpdateCondition = logManager.getLogUpdateCondition();
+ Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
while (logManager.getLastLogIndex() < prevLogIndex &&
alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
try {
@@ -1766,7 +1766,7 @@ public abstract class RaftMember {
return true;
}
synchronized (logUpdateCondition) {
- logUpdateCondition.wait(1);
+ logUpdateCondition.wait(prevLogIndex - lastLogIndex);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();