You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/04 03:20:31 UTC

[iotdb] branch expr_plus updated: fix tests

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_plus by this push:
     new 8525f85  fix tests
8525f85 is described below

commit 8525f85b7fbfab828e22e31b23deae897a037f66
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 4 11:19:46 2022 +0800

    fix tests
---
 cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java   |  2 +-
 .../main/java/org/apache/iotdb/cluster/log/VotingLogList.java |  4 +++-
 .../server/handlers/caller/AppendNodeEntryHandler.java        | 11 ++++++-----
 .../org/apache/iotdb/cluster/server/member/RaftMember.java    |  6 +++++-
 .../server/handlers/caller/AppendNodeEntryHandlerTest.java    |  2 ++
 .../sizetiered/SizeTieredCompactionHandleExceptionTest.java   |  1 +
 6 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 571ec3a..efa0950 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -34,7 +34,7 @@ public abstract class Log implements Comparable<Log> {
 
   // make this configurable or adaptive
   protected static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
-  private long currLogIndex = Long.MIN_VALUE;
+  private volatile long currLogIndex = Long.MIN_VALUE;
   private long currLogTerm = -1;
 
   // for async application
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 1856ed5..f5cce6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -65,7 +65,9 @@ public class VotingLogList {
         VotingLog votingLog = logList.get(i);
         if (votingLog.getLog().getCurrLogIndex() <= index
             && votingLog.getLog().getCurrLogTerm() == term) {
-          votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+          synchronized (votingLog) {
+            votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+          }
           if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
             lastEntryIndexToCommit = i;
           }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b1b2efb..9cae7e8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -35,6 +35,7 @@ import java.net.ConnectException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_OUT_OF_WINDOW;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
@@ -86,7 +87,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
     long resp = response.status;
 
-    if (resp == RESPONSE_STRONG_ACCEPT) {
+    if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
       member
           .getVotingLogList()
           .onStronglyAccept(
@@ -109,8 +110,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
         receiverTerm.set(resp);
       }
       leaderShipStale.set(true);
-      synchronized (log.getStronglyAcceptedNodeIds()) {
-        log.getStronglyAcceptedNodeIds().notifyAll();
+      synchronized (log) {
+        log.notifyAll();
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
@@ -119,7 +120,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
             >= quorumSize) {
           log.acceptedTime.set(System.nanoTime());
         }
-        log.getStronglyAcceptedNodeIds().notifyAll();
+        log.notifyAll();
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
@@ -158,7 +159,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       if (log.getFailedNodeIds().size() > quorumSize) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
-        log.getStronglyAcceptedNodeIds().notifyAll();
+        log.notifyAll();
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 9eb9754..a28f220 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1704,7 +1704,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     long alreadyWait = 0;
 
     synchronized (log) {
-      while (log.getLog().getCurrLogIndex() == -1
+      while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || stronglyAcceptedNodeNum < quorumSize
               && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
                   || (totalAccepted < quorumSize)
@@ -1770,6 +1770,10 @@ public abstract class RaftMember implements RaftMemberMBean {
             && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
       waitAppendResultLoop(log, quorumSize);
     }
+    stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
     if (log.acceptedTime.get() != 0) {
       Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index e84bc89..77111cb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -70,6 +70,7 @@ public class AppendNodeEntryHandlerTest {
     try {
       ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
       VotingLog votingLog = new VotingLog(log, 10);
+      member.getVotingLogList().insert(votingLog);
       Peer peer = new Peer(1);
       for (int i = 0; i < 10; i++) {
         AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -104,6 +105,7 @@ public class AppendNodeEntryHandlerTest {
     AtomicBoolean leadershipStale = new AtomicBoolean(false);
     Log log = new TestLog();
     VotingLog votingLog = new VotingLog(log, 10);
+    member.getVotingLogList().insert(votingLog);
     Peer peer = new Peer(1);
 
     for (int i = 0; i < 3; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
index 0371c9d..84bbe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
 
+import java.util.concurrent.RunnableFuture;
 import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.exception.StorageEngineException;