You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/04 03:20:31 UTC
[iotdb] branch expr_plus updated: fix tests
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_plus by this push:
new 8525f85 fix tests
8525f85 is described below
commit 8525f85b7fbfab828e22e31b23deae897a037f66
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 4 11:19:46 2022 +0800
fix tests
---
cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java | 2 +-
.../main/java/org/apache/iotdb/cluster/log/VotingLogList.java | 4 +++-
.../server/handlers/caller/AppendNodeEntryHandler.java | 11 ++++++-----
.../org/apache/iotdb/cluster/server/member/RaftMember.java | 6 +++++-
.../server/handlers/caller/AppendNodeEntryHandlerTest.java | 2 ++
.../sizetiered/SizeTieredCompactionHandleExceptionTest.java | 1 +
6 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 571ec3a..efa0950 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -34,7 +34,7 @@ public abstract class Log implements Comparable<Log> {
// make this configurable or adaptive
protected static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
- private long currLogIndex = Long.MIN_VALUE;
+ private volatile long currLogIndex = Long.MIN_VALUE;
private long currLogTerm = -1;
// for async application
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 1856ed5..f5cce6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -65,7 +65,9 @@ public class VotingLogList {
VotingLog votingLog = logList.get(i);
if (votingLog.getLog().getCurrLogIndex() <= index
&& votingLog.getLog().getCurrLogTerm() == term) {
- votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+ synchronized (votingLog) {
+ votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+ }
if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
lastEntryIndexToCommit = i;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b1b2efb..9cae7e8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -35,6 +35,7 @@ import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_OUT_OF_WINDOW;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
@@ -86,7 +87,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
long resp = response.status;
- if (resp == RESPONSE_STRONG_ACCEPT) {
+ if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
member
.getVotingLogList()
.onStronglyAccept(
@@ -109,8 +110,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
receiverTerm.set(resp);
}
leaderShipStale.set(true);
- synchronized (log.getStronglyAcceptedNodeIds()) {
- log.getStronglyAcceptedNodeIds().notifyAll();
+ synchronized (log) {
+ log.notifyAll();
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
@@ -119,7 +120,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
>= quorumSize) {
log.acceptedTime.set(System.nanoTime());
}
- log.getStronglyAcceptedNodeIds().notifyAll();
+ log.notifyAll();
}
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
@@ -158,7 +159,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
if (log.getFailedNodeIds().size() > quorumSize) {
// quorum members have failed, there is no need to wait for others
log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
- log.getStronglyAcceptedNodeIds().notifyAll();
+ log.notifyAll();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 9eb9754..a28f220 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1704,7 +1704,7 @@ public abstract class RaftMember implements RaftMemberMBean {
long alreadyWait = 0;
synchronized (log) {
- while (log.getLog().getCurrLogIndex() == -1
+ while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| stronglyAcceptedNodeNum < quorumSize
&& (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
|| (totalAccepted < quorumSize)
@@ -1770,6 +1770,10 @@ public abstract class RaftMember implements RaftMemberMBean {
&& !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
waitAppendResultLoop(log, quorumSize);
}
+ stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+ weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
if (log.acceptedTime.get() != 0) {
Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index e84bc89..77111cb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -70,6 +70,7 @@ public class AppendNodeEntryHandlerTest {
try {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
VotingLog votingLog = new VotingLog(log, 10);
+ member.getVotingLogList().insert(votingLog);
Peer peer = new Peer(1);
for (int i = 0; i < 10; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -104,6 +105,7 @@ public class AppendNodeEntryHandlerTest {
AtomicBoolean leadershipStale = new AtomicBoolean(false);
Log log = new TestLog();
VotingLog votingLog = new VotingLog(log, 10);
+ member.getVotingLogList().insert(votingLog);
Peer peer = new Peer(1);
for (int i = 0; i < 3; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
index 0371c9d..84bbe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
+import java.util.concurrent.RunnableFuture;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.exception.StorageEngineException;