You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/31 03:15:58 UTC
[iotdb] 01/01: split condition for waitPrevLog into multiple
conditions
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_split_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9da7b7290f433d63032297bf3704916d5498ddfd
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 31 11:12:55 2020 +0800
split condition for waitPrevLog into multiple conditions
---
.../cluster/log/manage/CommittedEntryManager.java | 5 +--
.../iotdb/cluster/log/manage/RaftLogManager.java | 15 ++++++---
.../log/manage/UnCommittedEntryManager.java | 10 ++++--
.../iotdb/cluster/server/member/RaftMember.java | 38 +++++++++++++---------
4 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index 259b3e5..e65be0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -98,8 +98,9 @@ public class CommittedEntryManager {
*
* @return entries's size
*/
- long getTotalSize() {
- return getLastIndex() - getFirstIndex() + 1;
+ int getTotalSize() {
+ // the first one is a sentry
+ return entries.size() - 1;
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..1c34f70 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -114,7 +114,7 @@ public abstract class RaftLogManager {
* Each time new logs are appended, this condition will be notified so logs that have larger
* indices but arrived earlier can proceed.
*/
- private final Object logUpdateCondition = new Object();
+ private final Object[] logUpdateConditions = new Object[1024];
private List<Log> blockedUnappliedLogList;
@@ -176,6 +176,10 @@ public abstract class RaftLogManager {
if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
this.applyAllCommittedLogWhenStartUp();
}
+
+ for (int i = 0; i < logUpdateConditions.length; i++) {
+ logUpdateConditions[i] = new Object();
+ }
}
public Snapshot getSnapshot() {
@@ -426,6 +430,8 @@ public abstract class RaftLogManager {
return -1;
}
getUnCommittedEntryManager().truncateAndAppend(entries);
+ Object logUpdateCondition = getLogUpdateCondition(
+ entries.get(entries.size() - 1).getCurrLogIndex());
synchronized (logUpdateCondition) {
logUpdateCondition.notifyAll();
}
@@ -446,6 +452,7 @@ public abstract class RaftLogManager {
return -1;
}
getUnCommittedEntryManager().truncateAndAppend(entry);
+ Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
synchronized (logUpdateCondition) {
logUpdateCondition.notifyAll();
}
@@ -568,7 +575,7 @@ public abstract class RaftLogManager {
.clear();
}
try {
- int currentSize = (int) committedEntryManager.getTotalSize();
+ int currentSize = committedEntryManager.getTotalSize();
int deltaSize = entries.size();
if (currentSize + deltaSize > maxNumOfLogsInMem) {
int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize;
@@ -835,8 +842,8 @@ public abstract class RaftLogManager {
}
- public Object getLogUpdateCondition() {
- return logUpdateCondition;
+ public Object getLogUpdateCondition(long logIndex) {
+ return logUpdateConditions[(int) (logIndex % logUpdateConditions.length)];
}
void applyAllCommittedLogWhenStartUp() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 07ae450..d83aed7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -59,8 +59,14 @@ public class UnCommittedEntryManager {
*/
long maybeLastIndex() {
int entryNum = entries.size();
- if (entryNum != 0) {
- return offset + entryNum - 1;
+ while (entryNum != 0) {
+ try {
+ return entries.get(entryNum - 1).getCurrLogIndex();
+ } catch (IndexOutOfBoundsException e) {
+ // the exception is thrown when there is a concurrent deletion, which is rare, so a retry
+ // is usually enough and it is not likely that we will get stuck here
+ entryNum = entries.size();
+ }
}
return -1;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d781669..4e7442c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -57,12 +57,16 @@ import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -1738,17 +1742,18 @@ public abstract class RaftMember {
return resp;
}
+ long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ long success;
synchronized (logManager) {
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
- long success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (success != -1) {
- logger.debug("{} append a new log {}", name, log);
- resp = Response.RESPONSE_AGREE;
- } else {
- // the incoming log points to an illegal position, reject it
- resp = Response.RESPONSE_LOG_MISMATCH;
- }
+ success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ }
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+ if (success != -1) {
+ logger.debug("{} append a new log {}", name, log);
+ resp = Response.RESPONSE_AGREE;
+ } else {
+ // the incoming log points to an illegal position, reject it
+ resp = Response.RESPONSE_LOG_MISMATCH;
}
return resp;
}
@@ -1759,18 +1764,19 @@ public abstract class RaftMember {
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- Object logUpdateCondition = logManager.getLogUpdateCondition();
- while (logManager.getLastLogIndex() < prevLogIndex &&
+ Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
+ long lastLogIndex = logManager.getLastLogIndex();
+ while (lastLogIndex < prevLogIndex &&
alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
try {
// each time new logs are appended, this will be notified
- long lastLogIndex = logManager.getLastLogIndex();
- if (lastLogIndex >= prevLogIndex) {
- return true;
- }
synchronized (logUpdateCondition) {
logUpdateCondition.wait(1);
}
+ lastLogIndex = logManager.getLastLogIndex();
+ if (lastLogIndex >= prevLogIndex) {
+ return true;
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;