You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/08/09 02:49:54 UTC
[iotdb] branch expr updated: add voting log and fix tests
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new 2cb779a add voting log and fix tests
2cb779a is described below
commit 2cb779a4c5391002d3744d963c76c1f2dfcdf144
Author: jt <jt...@163.com>
AuthorDate: Mon Aug 9 10:47:46 2021 +0800
add voting log and fix tests
---
cluster/distribute.sh | 4 +-
.../cluster/expr/ExprAppendNodeEntryHandler.java | 25 ++++
.../org/apache/iotdb/cluster/expr/ExprBench.java | 1 +
.../org/apache/iotdb/cluster/expr/ExprMember.java | 146 ++++++++++++++++-----
.../apache/iotdb/cluster/expr/ExprVotingLog.java | 58 ++++++++
.../apache/iotdb/cluster/expr/VotingLogList.java | 87 ++++++++++++
.../iotdb/cluster/log/IndirectLogDispatcher.java | 11 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 93 ++++---------
.../org/apache/iotdb/cluster/log/VotingLog.java | 49 +++++++
.../iotdb/cluster/log/catchup/LogCatchUpTask.java | 5 +-
.../handlers/caller/AppendGroupEntryHandler.java | 43 +++---
.../handlers/caller/AppendNodeEntryHandler.java | 48 +++----
.../server/handlers/caller/LogCatchUpHandler.java | 7 +-
.../handlers/caller/LogCatchUpInBatchHandler.java | 7 +-
.../cluster/server/member/MetaGroupMember.java | 107 ++++++++++-----
.../iotdb/cluster/server/member/RaftMember.java | 123 ++++++++---------
.../iotdb/cluster/common/TestAsyncDataClient.java | 5 +-
.../iotdb/cluster/log/LogDispatcherTest.java | 21 +--
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 25 ++--
.../cluster/log/catchup/LogCatchUpTaskTest.java | 19 +--
.../log/catchup/SnapshotCatchUpTaskTest.java | 9 +-
.../caller/AppendGroupEntryHandlerTest.java | 38 +++---
.../caller/AppendNodeEntryHandlerTest.java | 65 +++++----
.../handlers/caller/LogCatchUpHandlerTest.java | 7 +-
.../cluster/server/member/MetaGroupMemberTest.java | 5 +-
25 files changed, 657 insertions(+), 351 deletions(-)
diff --git a/cluster/distribute.sh b/cluster/distribute.sh
index 98758da..7981466 100644
--- a/cluster/distribute.sh
+++ b/cluster/distribute.sh
@@ -1,6 +1,6 @@
src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
-ips=(fit36 fit38 fit39)
+ips=(fit35 fit36 fit38 fit39)
target_lib_path=/data/iotdb_expr/lib/
for ip in ${ips[*]}
@@ -9,7 +9,7 @@ for ip in ${ips[*]}
scp -r $src_lib_path fit@$ip:$target_lib_path
done
-ips=(fit31 fit32 fit33 fit34)
+ips=(fit31 fit33 fit34)
target_lib_path=/disk/iotdb_expr/lib/
for ip in ${ips[*]}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprAppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprAppendNodeEntryHandler.java
new file mode 100644
index 0000000..bf59649
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprAppendNodeEntryHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
+
+public class ExprAppendNodeEntryHandler extends AppendNodeEntryHandler {
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index b625f2c..17f1b4d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -98,6 +98,7 @@ public class ExprBench {
ExprBench bench = new ExprBench(target);
bench.maxRequestNum = Integer.parseInt(args[2]);
bench.threadNum = Integer.parseInt(args[3]);
+ bench.workloadSize = Integer.parseInt(args[4]) * 1024;
bench.benchmark();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 0e4c8c0..3c6189b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.expr;
import java.nio.ByteBuffer;
+import java.sql.Time;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.cluster.coordinator.Coordinator;
@@ -31,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -46,12 +48,21 @@ public class ExprMember extends MetaGroupMember {
public static boolean bypassRaft = false;
public static boolean useSlidingWindow = false;
+ private VotingLogList votingLogList;
+
+ private int windowSize = 10000;
+ private Log[] logWindow = new Log[windowSize];
+ private long[] prevIndices = new long[windowSize];
+ private long[] prevTerms = new long[windowSize];
+
+
public ExprMember() {
}
public ExprMember(Node thisNode, List<Node> allNodes) {
this.thisNode = thisNode;
this.allNodes = allNodes;
+ this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
}
public ExprMember(TProtocolFactory factory,
@@ -60,15 +71,15 @@ public class ExprMember extends MetaGroupMember {
super(factory, thisNode, coordinator);
}
- private int windowSize = 10000;
- private Log[] logWindow = new Log[windowSize];
- private long windowPrevLogIndex;
- private long windowPrevLogTerm;
- private long windowTerm;
+ @Override
+ public void setAllNodes(List<Node> allNodes) {
+ super.setAllNodes(allNodes);
+ this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
+ }
@Override
protected synchronized void startSubServers() {
-
+ // do not start data groups in such experiments
}
@Override
@@ -90,7 +101,10 @@ public class ExprMember extends MetaGroupMember {
if (!ClusterUtils.isNodeEquals(node, thisNode)) {
Client syncClient = getSyncClient(node);
try {
+ long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG
+ .getOperationStartTime();
syncClient.executeNonQueryPlan(req);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
} catch (TException e) {
ClientUtils.putBackSyncClient(syncClient);
return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
@@ -103,46 +117,110 @@ public class ExprMember extends MetaGroupMember {
return processNonPartitionedMetaPlan(plan);
}
- protected AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ /**
+ * After insert an entry into the window, check if its previous and latter entries should be
+ * removed if it mismatches.
+ *
+ * @param pos
+ */
+ private void checkLog(int pos) {
+ checkLogPrev(pos);
+ checkLogNext(pos);
+ }
+
+ private void checkLogPrev(int pos) {
+ // check the previous entry
+ long prevLogIndex = prevIndices[pos];
+ long prevLogTerm = prevTerms[pos];
+ if (pos > 0) {
+ Log prev = logWindow[pos - 1];
+ if (prev != null && (prev.getCurrLogIndex() != prevLogIndex || prev.getCurrLogTerm() != prevLogTerm)) {
+ logWindow[pos - 1] = null;
+ }
+ }
+ }
+
+ private void checkLogNext(int pos) {
+ // check the next entry
+ Log log = logWindow[pos];
+ boolean nextMismatch = false;
+ if (pos < windowSize - 1) {
+ long nextPrevIndex = prevIndices[pos + 1];
+ long nextPrevTerm = prevTerms[pos + 1];
+ if (!(nextPrevIndex != log.getCurrLogIndex() || nextPrevTerm != log.getCurrLogTerm())) {
+ nextMismatch = true;
+ }
+ }
+ if (nextMismatch) {
+ for (int i = pos + 1; i < windowSize; i++) {
+ if (logWindow[i] != null) {
+ logWindow[i] = null;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Flush window range [0, flushPos) into the LogManager, where flushPos is the first null
+ * position in the window.
+ * @param result
+ * @param leaderCommit
+ * @return
+ */
+ private long flushWindow(AppendEntryResult result, long leaderCommit) {
+ long windowPrevLogIndex = prevIndices[0];
+ long windowPrevLogTerm = prevTerms[0];
+
+ int flushPos = 0;
+ for (; flushPos < windowSize; flushPos++) {
+ if (logWindow[flushPos] == null) {
+ break;
+ }
+ }
+ // flush [0, flushPos)
+ List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+ long success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
+ logs);
+ if (success != -1) {
+ System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+ for (int i = 1; i <= flushPos; i++) {
+ logWindow[windowSize - i] = null;
+ }
+ }
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ return success;
+ }
+
+ protected AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit,
+ Log log) {
if (!useSlidingWindow) {
return super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
}
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
- long success = 0;
+ long appendedPos = 0;
AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
- long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1;
+ int windowPos = (int) (log.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
if (windowPos < 0) {
- success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ // the new entry may replace an appended entry
+ appendedPos = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
} else if (windowPos < windowSize) {
- logWindow[(int) windowPos] = log;
+ // the new entry falls into the window
+ logWindow[windowPos] = log;
+ prevIndices[windowPos] = prevLogIndex;
+ prevTerms[windowPos] = prevLogTerm;
+ checkLog(windowPos);
+
if (windowPos == 0) {
- windowPrevLogIndex = prevLogIndex;
- windowPrevLogTerm = prevLogTerm;
-
- int flushPos = 0;
- for (; flushPos < windowSize; flushPos++) {
- if (logWindow[flushPos] == null) {
- break;
- }
- }
- // flush [0, flushPos)
- List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
- success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
- logs);
- if (success != -1) {
- System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
- for (int i = 1; i <= flushPos; i++) {
- logWindow[windowSize - i] = null;
- }
- }
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
+ appendedPos = flushWindow(result, leaderCommit);
} else {
result.status = Response.RESPONSE_WEAK_ACCEPT;
}
@@ -152,7 +230,7 @@ public class ExprMember extends MetaGroupMember {
}
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (success == -1) {
+ if (appendedPos == -1) {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprVotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprVotingLog.java
new file mode 100644
index 0000000..bcbb126
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprVotingLog.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.VotingLog;
+
+public class ExprVotingLog extends VotingLog {
+ private Set<Integer> weaklyAcceptedNodeIds;
+
+ public ExprVotingLog(Log log, int groupSize) {
+ super(log, groupSize);
+ this.weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+ }
+
+ public Log getLog() {
+ return log;
+ }
+
+ public void setLog(Log log) {
+ this.log = log;
+ }
+
+ public Set<Integer> getStronglyAcceptedNodeIds() {
+ return stronglyAcceptedNodeIds;
+ }
+
+ public void setStronglyAcceptedNodeIds(Set<Integer> stronglyAcceptedNodeIds) {
+ this.stronglyAcceptedNodeIds = stronglyAcceptedNodeIds;
+ }
+
+ public Set<Integer> getWeaklyAcceptedNodeIds() {
+ return weaklyAcceptedNodeIds;
+ }
+
+ public void setWeaklyAcceptedNodeIds(Set<Integer> weaklyAcceptedNodeIds) {
+ this.weaklyAcceptedNodeIds = weaklyAcceptedNodeIds;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
new file mode 100644
index 0000000..695d37b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.log.VotingLog;
+
+public class VotingLogList {
+
+ private List<ExprVotingLog> logList = new ArrayList<>();
+ private volatile long currTerm = -1;
+ private int quorumSize;
+
+ public VotingLogList(int quorumSize) {
+ this.quorumSize = quorumSize;
+ }
+
+ /**
+ * Insert a voting entry into the list. Notice the logs must be inserted in order of index, as
+ * they are inserted as soon as created
+ *
+ * @param log
+ */
+ public synchronized void insert(ExprVotingLog log) {
+ if (log.getLog().getCurrLogTerm() != currTerm) {
+ logList.clear();
+ currTerm = log.getLog().getCurrLogTerm();
+ }
+ logList.add(log);
+ }
+
+
+ /**
+ * When an entry of index-term is strongly accepted by a node of acceptingNodeId, record the id in
+ * all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove
+ * it from the list.
+ *
+ * @param index
+ * @param term
+ * @param acceptingNodeId
+ * @return the lastly removed entry if any.
+ */
+ public synchronized VotingLog onStronglyAccept(long index, long term, int acceptingNodeId) {
+ int lastEntryIndexToCommit = -1;
+ for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
+ ExprVotingLog votingLog = logList.get(i);
+ if (votingLog.getLog().getCurrLogIndex() <= index
+ && votingLog.getLog().getCurrLogTerm() == term) {
+ votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+ if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
+ lastEntryIndexToCommit = i;
+ }
+ }
+ }
+
+ VotingLog ret = null;
+ if (lastEntryIndexToCommit != -1) {
+ ret = logList.get(lastEntryIndexToCommit);
+ logList.subList(0, lastEntryIndexToCommit + 1).clear();
+ }
+
+ return ret;
+ }
+
+ public synchronized void clear() {
+ logList.clear();
+ }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index ff6bd96..1c63675 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -100,16 +100,17 @@ public class IndirectLogDispatcher extends LogDispatcher {
@Override
void sendLog(SendLogRequest logRequest) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
+ logRequest.getVotingLog().getLog().getCreateTime());
member.sendLogToFollower(
- logRequest.getLog(),
- logRequest.getVotedNodeIds(),
+ logRequest.getVotingLog(),
receiver,
logRequest.getLeaderShipStale(),
logRequest.getNewLeaderTerm(),
- logRequest.getAppendEntryRequest(), directToIndirectFollowerMap.get(receiver));
+ logRequest.getAppendEntryRequest(),
+ logRequest.getQuorumSize(),
+ directToIndirectFollowerMap.get(receiver));
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
+ logRequest.getVotingLog().getLog().getCreateTime());
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index b0dd31c..1b7d40a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.log;
-import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -53,7 +52,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -97,14 +95,14 @@ public class LogDispatcher {
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
- public void offer(SendLogRequest log) {
+ public void offer(SendLogRequest request) {
// do serialization here to avoid taking LogManager for too long
if (!nodeLogQueues.isEmpty()) {
- log.serializedLogFuture =
+ request.serializedLogFuture =
serializationService.submit(
() -> {
- ByteBuffer byteBuffer = log.getLog().serialize();
- log.getLog().setByteSize(byteBuffer.array().length);
+ ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
+ request.getVotingLog().getLog().setByteSize(byteBuffer.array().length);
return byteBuffer;
});
}
@@ -115,22 +113,22 @@ public class LogDispatcher {
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
addSucceeded =
nodeLogQueue.offer(
- log,
+ request,
ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
TimeUnit.MILLISECONDS);
} else {
- addSucceeded = nodeLogQueue.add(log);
+ addSucceeded = nodeLogQueue.add(request);
}
if (!addSucceeded) {
logger.debug(
- "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
+ "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
} else {
- log.setEnqueueTime(System.nanoTime());
+ request.setEnqueueTime(System.nanoTime());
}
} catch (IllegalStateException e) {
logger.debug(
- "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
+ "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -153,8 +151,7 @@ public class LogDispatcher {
public static class SendLogRequest {
- private Log log;
- private Set<Integer> votedNodeIds;
+ private VotingLog votingLog;
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
private AppendEntryRequest appendEntryRequest;
@@ -163,34 +160,24 @@ public class LogDispatcher {
private int quorumSize;
public SendLogRequest(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest appendEntryRequest,
int quorumSize) {
- this.setLog(log);
- this.setVotedNodeIds(votedNodeIds);
+ this.setVotingLog(log);
this.setLeaderShipStale(leaderShipStale);
this.setNewLeaderTerm(newLeaderTerm);
this.setAppendEntryRequest(appendEntryRequest);
this.setQuorumSize(quorumSize);
}
- public Set<Integer> getVotedNodeIds() {
- return votedNodeIds;
+ public VotingLog getVotingLog() {
+ return votingLog;
}
- public void setVotedNodeIds(Set<Integer> votedNodeIds) {
- this.votedNodeIds = votedNodeIds;
- }
-
- public Log getLog() {
- return log;
- }
-
- public void setLog(Log log) {
- this.log = log;
+ public void setVotingLog(VotingLog votingLog) {
+ this.votingLog = votingLog;
}
public long getEnqueueTime() {
@@ -235,7 +222,7 @@ public class LogDispatcher {
@Override
public String toString() {
- return "SendLogRequest{" + "log=" + log + '}';
+ return "SendLogRequest{" + "log=" + votingLog + '}';
}
}
@@ -300,12 +287,12 @@ public class LogDispatcher {
List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) {
long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- if (!member.waitForPrevLog(peer, currBatch.get(0).getLog())) {
+ if (!member.waitForPrevLog(peer, currBatch.get(0).getVotingLog().getLog())) {
logger.warn(
"{}: node {} timed out when appending {}",
member.getName(),
receiver,
- currBatch.get(0).getLog());
+ currBatch.get(0).getVotingLog());
return;
}
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
@@ -346,7 +333,7 @@ public class LogDispatcher {
request.setEntries(logList);
// set index for raft
- request.setPrevLogIndex(currBatch.get(firstIndex).getLog().getCurrLogIndex() - 1);
+ request.setPrevLogIndex(currBatch.get(firstIndex).getVotingLog().getLog().getCurrLogIndex() - 1);
try {
request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
} catch (Exception e) {
@@ -359,8 +346,8 @@ public class LogDispatcher {
int logIndex = 0;
logger.debug(
"send logs from index {} to {}",
- currBatch.get(0).getLog().getCurrLogIndex(),
- currBatch.get(currBatch.size() - 1).getLog().getCurrLogIndex());
+ currBatch.get(0).getVotingLog().getLog().getCurrLogIndex(),
+ currBatch.get(currBatch.size() - 1).getVotingLog().getLog().getCurrLogIndex());
while (logIndex < currBatch.size()) {
long logSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
List<ByteBuffer> logList = new ArrayList<>();
@@ -373,7 +360,7 @@ public class LogDispatcher {
}
logSize -= curSize;
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- currBatch.get(logIndex).getLog().getCreateTime());
+ currBatch.get(logIndex).getVotingLog().getLog().getCreateTime());
logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
}
@@ -385,7 +372,7 @@ public class LogDispatcher {
}
for (; prevIndex < logIndex; prevIndex++) {
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- currBatch.get(prevIndex).getLog().getCreateTime());
+ currBatch.get(prevIndex).getVotingLog().getLog().getCreateTime());
}
}
}
@@ -406,17 +393,16 @@ public class LogDispatcher {
void sendLog(SendLogRequest logRequest) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
+ logRequest.getVotingLog().getLog().getCreateTime());
member.sendLogToFollower(
- logRequest.getLog(),
- logRequest.getVotedNodeIds(),
+ logRequest.getVotingLog(),
receiver,
logRequest.getLeaderShipStale(),
logRequest.getNewLeaderTerm(),
logRequest.getAppendEntryRequest(),
logRequest.getQuorumSize());
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
+ logRequest.getVotingLog().getLog().getCreateTime());
}
class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
@@ -427,9 +413,8 @@ public class LogDispatcher {
singleEntryHandlers = new ArrayList<>(batch.size());
for (SendLogRequest sendLogRequest : batch) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(
- sendLogRequest.getLog(),
- sendLogRequest.getVotedNodeIds(),
+ member.getAppendNodeEntryHandler(
+ sendLogRequest.getVotingLog(),
receiver,
sendLogRequest.getLeaderShipStale(),
sendLogRequest.getNewLeaderTerm(),
@@ -452,26 +437,6 @@ public class LogDispatcher {
singleEntryHandler.onError(e);
}
}
-
- private AppendNodeEntryHandler getAppendNodeEntryHandler(
- Log log,
- Set<Integer> voteCounter,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- Peer peer,
- int quorumSize) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setReceiver(node);
- handler.setVotedNodeIds(voteCounter);
- handler.setLeaderShipStale(leaderShipStale);
- handler.setLog(log);
- handler.setMember(member);
- handler.setPeer(peer);
- handler.setReceiverTerm(newLeaderTerm);
- handler.setQuorumSize(quorumSize);
- return handler;
- }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
new file mode 100644
index 0000000..da9f4af
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class VotingLog {
+ protected Log log;
+ protected Set<Integer> stronglyAcceptedNodeIds;
+
+ public VotingLog(Log log, int groupSize) {
+ this.log = log;
+ stronglyAcceptedNodeIds = new HashSet<>(groupSize);
+ }
+
+ public Log getLog() {
+ return log;
+ }
+
+ public void setLog(Log log) {
+ this.log = log;
+ }
+
+ public Set<Integer> getStronglyAcceptedNodeIds() {
+ return stronglyAcceptedNodeIds;
+ }
+
+ public void setStronglyAcceptedNodeIds(Set<Integer> stronglyAcceptedNodeIds) {
+ this.stronglyAcceptedNodeIds = stronglyAcceptedNodeIds;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
index dcb287d..0158b5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -152,7 +153,7 @@ public class LogCatchUpTask implements Callable<Boolean> {
}
try {
- long result = client.appendEntry(request);
+ AppendEntryResult result = client.appendEntry(request);
handler.onComplete(result);
return handler.getAppendSucceed().get();
} catch (TException e) {
@@ -315,7 +316,7 @@ public class LogCatchUpTask implements Callable<Boolean> {
return false;
}
try {
- long result = client.appendEntries(request);
+ AppendEntryResult result = client.appendEntries(request);
handler.onComplete(result);
return appendSucceed.get();
} catch (TException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
index e83857b..0b67fb5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.cluster.server.handlers.caller;
+import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -32,7 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
/**
* AppendGroupEntryHandler checks if the log is successfully appended by the quorum or some node has
@@ -41,7 +43,7 @@ import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
* if the actually agreed nodes can be less than quorum, because the same nodes may say "yes" for
* multiple groups.
*/
-public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
+public class AppendGroupEntryHandler implements AsyncMethodCallback<AppendEntryResult> {
private static final Logger logger = LoggerFactory.getLogger(AppendGroupEntryHandler.class);
@@ -52,7 +54,7 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
// for example: assuming there are 4 nodes and 3 replicas, then the initial array will be:
// [2, 2, 2, 2]. And if node0 accepted the log, as node0 is in group 2,3,0, the array will be
// [1, 2, 1, 1].
- private int[] groupReceivedCounter;
+ private Set<Integer>[] groupReceivedNodeIds;
// the index of the node which the request sends log to, if the node accepts the log, all
// groups' counters the node is in should decrease
private int receiverNodeIndex;
@@ -60,48 +62,51 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
// store the flag of leadership lost and the new leader's term
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
+ private int quorumSize;
private int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
private AtomicInteger erroredNodeNum = new AtomicInteger(0);
public AppendGroupEntryHandler(
- int[] groupReceivedCounter,
+ Set<Integer>[] groupReceivedNodeIds,
int receiverNodeIndex,
Node receiverNode,
AtomicBoolean leaderShipStale,
Log log,
AtomicLong newLeaderTerm,
- RaftMember member) {
- this.groupReceivedCounter = groupReceivedCounter;
+ RaftMember member,
+ int quorumSize) {
+ this.groupReceivedNodeIds = groupReceivedNodeIds;
this.receiverNodeIndex = receiverNodeIndex;
this.receiverNode = receiverNode;
this.leaderShipStale = leaderShipStale;
this.log = log;
this.newLeaderTerm = newLeaderTerm;
this.member = member;
+ this.quorumSize = quorumSize;
}
@Override
- public void onComplete(Long response) {
+ public void onComplete(AppendEntryResult response) {
if (leaderShipStale.get()) {
// someone has rejected this log because the leadership is stale
return;
}
- long resp = response;
+ long resp = response.status;
- if (resp == RESPONSE_AGREE) {
+ if (resp == RESPONSE_STRONG_ACCEPT) {
processAgreement();
} else if (resp > 0) {
// a response > 0 is the term fo the follower
- synchronized (groupReceivedCounter) {
+ synchronized (groupReceivedNodeIds) {
// the leader ship is stale, abort and wait for the new leader's heartbeat
long previousNewTerm = newLeaderTerm.get();
if (previousNewTerm < resp) {
newLeaderTerm.set(resp);
}
leaderShipStale.set(true);
- groupReceivedCounter.notifyAll();
+ groupReceivedNodeIds.notifyAll();
}
}
// rejected because the follower's logs are stale or the follower has no cluster info, just
@@ -113,28 +118,28 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
* example. If all counters reach 0, wake the waiting thread to welcome the success.
*/
private void processAgreement() {
- synchronized (groupReceivedCounter) {
+ synchronized (groupReceivedNodeIds) {
logger.debug("{}: Node {} has accepted log {}", member.getName(), receiverNode, log);
// this node is contained in REPLICATION_NUM groups, decrease the counters of these groups
for (int i = 0; i < replicationNum; i++) {
int nodeIndex = receiverNodeIndex - i;
if (nodeIndex < 0) {
- nodeIndex += groupReceivedCounter.length;
+ nodeIndex += groupReceivedNodeIds.length;
}
- groupReceivedCounter[nodeIndex]--;
+ groupReceivedNodeIds[nodeIndex].add(receiverNode.nodeIdentifier);
}
// examine if all groups has agreed
boolean allAgreed = true;
- for (int remaining : groupReceivedCounter) {
- if (remaining > 0) {
+ for (Set<Integer> receivedNodeIds : groupReceivedNodeIds) {
+ if (receivedNodeIds.size() < quorumSize) {
allAgreed = false;
break;
}
}
if (allAgreed) {
// wake up the parent thread to welcome the new node
- groupReceivedCounter.notifyAll();
+ groupReceivedNodeIds.notifyAll();
}
}
}
@@ -147,10 +152,10 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
receiverNode,
exception);
if (erroredNodeNum.incrementAndGet() >= replicationNum / 2) {
- synchronized (groupReceivedCounter) {
+ synchronized (groupReceivedNodeIds) {
logger.error(
"{}: Over half of the nodes failed, the request is rejected", member.getName());
- groupReceivedCounter.notifyAll();
+ groupReceivedNodeIds.notifyAll();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b233d20..bf87703 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -18,15 +18,13 @@
*/
package org.apache.iotdb.cluster.server.handlers.caller;
-import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
import java.net.ConnectException;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -47,14 +45,13 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
- private RaftMember member;
- private AtomicLong receiverTerm;
- private Log log;
- private Set<Integer> votedNodeIds;
- private AtomicBoolean leaderShipStale;
- private Node receiver;
- private Peer peer;
- private int quorumSize;
+ protected RaftMember member;
+ protected AtomicLong receiverTerm;
+ protected VotingLog log;
+ protected AtomicBoolean leaderShipStale;
+ protected Node receiver;
+ protected Peer peer;
+ protected int quorumSize;
// initialized as the quorum size, and decrease by 1 each time when we receive a rejection or
// an exception, upon decreased to zero, the request will be early-aborted
private int failedDecreasingCounter;
@@ -74,7 +71,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
if (Timer.ENABLE_INSTRUMENTING) {
Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
}
- if (votedNodeIds.contains(Integer.MAX_VALUE)) {
+ if (log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
// the request already failed
return;
}
@@ -84,10 +81,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
return;
}
long resp = response.status;
- synchronized (votedNodeIds) {
+ synchronized (log) {
if (resp == RESPONSE_STRONG_ACCEPT) {
- votedNodeIds.add(receiver.nodeIdentifier);
- int remaining = quorumSize - votedNodeIds.size();
+ log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
+ int remaining = quorumSize - log.getStronglyAcceptedNodeIds().size();
logger.debug(
"{}: Received an agreement from {} for {}, remaining votes to succeed: {}",
member.getName(),
@@ -98,11 +95,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.debug(
"{}: Log [{}] {} is accepted by the quorum",
member.getName(),
- log.getCurrLogIndex(),
+ log.getLog(),
log);
- votedNodeIds.notifyAll();
+ log.notifyAll();
}
- peer.setMatchIndex(Math.max(log.getCurrLogIndex(), peer.getMatchIndex()));
+ peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
} else if (resp > 0) {
// a response > 0 is the follower's term
// the leader ship is stale, wait for the new leader's heartbeat
@@ -117,7 +114,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
receiverTerm.set(resp);
}
leaderShipStale.set(true);
- votedNodeIds.notifyAll();
+ log.notifyAll();
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
logger.debug(
@@ -145,17 +142,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
private void onFail() {
- synchronized (votedNodeIds) {
+ synchronized (log) {
failedDecreasingCounter--;
if (failedDecreasingCounter <= 0) {
// quorum members have failed, there is no need to wait for others
- votedNodeIds.add(Integer.MAX_VALUE);
- votedNodeIds.notifyAll();
+ log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
+ log.notifyAll();
}
}
}
- public void setLog(Log log) {
+ public void setLog(VotingLog log) {
this.log = log;
}
@@ -163,11 +160,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
this.member = member;
}
- public void setVotedNodeIds(Set<Integer> votedNodeIds) {
- this.votedNodeIds = votedNodeIds;
-
- }
-
public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
this.leaderShipStale = leaderShipStale;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
index ddf3af1..eeda328 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.server.handlers.caller;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -36,7 +37,7 @@ import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
* LogCatchUpHandler checks the result of appending a log in a catch-up task and decides to abort
* the catch up or not.
*/
-public class LogCatchUpHandler implements AsyncMethodCallback<Long> {
+public class LogCatchUpHandler implements AsyncMethodCallback<AppendEntryResult> {
private static final Logger logger = LoggerFactory.getLogger(LogCatchUpHandler.class);
@@ -47,9 +48,9 @@ public class LogCatchUpHandler implements AsyncMethodCallback<Long> {
private RaftMember raftMember;
@Override
- public void onComplete(Long response) {
+ public void onComplete(AppendEntryResult response) {
logger.debug("{}: Received a catch-up result of {} from {}", memberName, log, follower);
- long resp = response;
+ long resp = response.status;
if (resp == RESPONSE_AGREE) {
synchronized (appendSucceed) {
appendSucceed.set(true);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
index 07d2aa1..8a979ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.handlers.caller;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -33,7 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
-public class LogCatchUpInBatchHandler implements AsyncMethodCallback<Long> {
+public class LogCatchUpInBatchHandler implements AsyncMethodCallback<AppendEntryResult> {
private static final Logger logger = LoggerFactory.getLogger(LogCatchUpInBatchHandler.class);
@@ -44,11 +45,11 @@ public class LogCatchUpInBatchHandler implements AsyncMethodCallback<Long> {
private RaftMember raftMember;
@Override
- public void onComplete(Long response) {
+ public void onComplete(AppendEntryResult response) {
logger.debug(
"{}: Received a catch-up result size of {} from {}", memberName, logs.size(), follower);
- long resp = response;
+ long resp = response.status;
if (resp == RESPONSE_AGREE) {
synchronized (appendSucceed) {
appendSucceed.set(true);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 056a485..6bffeda 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -141,13 +141,19 @@ import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckRes
@SuppressWarnings("java:S1135")
public class MetaGroupMember extends RaftMember {
- /** the file that contains the identifier of this node */
+ /**
+ * the file that contains the identifier of this node
+ */
static final String NODE_IDENTIFIER_FILE_NAME =
IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "node_identifier";
- /** the file that contains the serialized partition table */
+ /**
+ * the file that contains the serialized partition table
+ */
static final String PARTITION_FILE_NAME =
IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "partitions";
- /** in case of data loss, some file changes would be made to a temporary file first */
+ /**
+ * in case of data loss, some file changes would be made to a temporary file first
+ */
private static final String TEMP_SUFFIX = ".tmp";
private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class);
@@ -163,7 +169,9 @@ public class MetaGroupMember extends RaftMember {
*/
private static final int REPORT_INTERVAL_SEC = 10;
- /** how many times is a data record replicated, also the number of nodes in a data group */
+ /**
+ * how many times is a data record replicated, also the number of nodes in a data group
+ */
private static final int REPLICATION_NUM =
ClusterDescriptor.getInstance().getConfig().getReplicationNum();
@@ -190,9 +198,13 @@ public class MetaGroupMember extends RaftMember {
*/
private Map<Integer, Node> idNodeMap = null;
- /** nodes in the cluster and data partitioning */
+ /**
+ * nodes in the cluster and data partitioning
+ */
private PartitionTable partitionTable;
- /** router calculates the partition groups that a partitioned plan should be sent to */
+ /**
+ * router calculates the partition groups that a partitioned plan should be sent to
+ */
private ClusterPlanRouter router;
/**
* each node contains multiple DataGroupMembers and they are managed by a DataClusterServer acting
@@ -200,7 +212,9 @@ public class MetaGroupMember extends RaftMember {
*/
private DataClusterServer dataClusterServer;
- /** each node starts a data heartbeat server to transfer heartbeat requests */
+ /**
+ * each node starts a data heartbeat server to transfer heartbeat requests
+ */
private DataHeartbeatServer dataHeartbeatServer;
/**
@@ -223,7 +237,9 @@ public class MetaGroupMember extends RaftMember {
*/
private StartUpStatus startUpStatus;
- /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
+ /**
+ * hardLinkCleaner will periodically clean expired hardlinks created during snapshots
+ */
private ScheduledExecutorService hardLinkCleanerThread;
private Coordinator coordinator;
@@ -241,7 +257,8 @@ public class MetaGroupMember extends RaftMember {
}
@TestOnly
- public MetaGroupMember() {}
+ public MetaGroupMember() {
+ }
public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator)
throws QueryProcessException {
@@ -394,7 +411,7 @@ public class MetaGroupMember extends RaftMember {
Node node = ClusterUtils.parseNode(seedUrl);
if (node != null
&& (!node.getInternalIp().equals(thisNode.internalIp)
- || node.getMetaPort() != thisNode.getMetaPort())
+ || node.getMetaPort() != thisNode.getMetaPort())
&& !allNodes.contains(node)) {
// do not add the local node since it is added in the constructor
allNodes.add(node);
@@ -722,7 +739,9 @@ public class MetaGroupMember extends RaftMember {
blindNodes.add(node);
}
- /** @return whether a node wants the partition table. */
+ /**
+ * @return whether a node wants the partition table.
+ */
public boolean isNodeBlind(Node node) {
return blindNodes.contains(node);
}
@@ -735,7 +754,9 @@ public class MetaGroupMember extends RaftMember {
blindNodes.remove(node);
}
- /** Register the identifier for the node if it does not conflict with other nodes. */
+ /**
+ * Register the identifier for the node if it does not conflict with other nodes.
+ */
private void registerNodeIdentifier(Node node, int identifier) {
synchronized (idNodeMap) {
Node conflictNode = idNodeMap.get(identifier);
@@ -759,7 +780,9 @@ public class MetaGroupMember extends RaftMember {
idNodeMap.put(thisNode.getNodeIdentifier(), thisNode);
}
- /** @return Whether all nodes' identifier is known. */
+ /**
+ * @return Whether all nodes' identifier is known.
+ */
private boolean allNodesIdKnown() {
return idNodeMap != null && idNodeMap.size() == allNodes.size();
}
@@ -784,7 +807,9 @@ public class MetaGroupMember extends RaftMember {
logger.info("Sub-servers started.");
}
- /** When the node restarts, it sends handshakes to all other nodes so they may know it is back. */
+ /**
+ * When the node restarts, it sends handshakes to all other nodes so they may know it is back.
+ */
private void sendHandshake() {
for (Node node : allNodes) {
try {
@@ -838,9 +863,9 @@ public class MetaGroupMember extends RaftMember {
* immediately. If the identifier of "node" conflicts with an existing node, the request will be
* turned down.
*
- * @param node cannot be the local node
+ * @param node cannot be the local node
* @param startUpStatus the start up status of the new node
- * @param response the response that will be sent to "node"
+ * @param response the response that will be sent to "node"
* @return true if the process is over, false if the request should be forwarded
*/
private boolean processAddNodeLocally(
@@ -1098,12 +1123,15 @@ public class MetaGroupMember extends RaftMember {
AppendEntryRequest request = buildAppendEntryRequest(log, true);
// ask for votes from each node
- int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
+ // a group is considered successfully received the log if such members receive the log
+ int groupQuorum = REPLICATION_NUM / 2 + 1;
+ Set<Integer>[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log,
+ newLeaderTerm, groupQuorum);
if (!leaderShipStale.get()) {
// if all quorums of all groups have received this log, it is considered succeeded.
- for (int remaining : groupRemainings) {
- if (remaining > 0) {
+ for (Set<Integer> remaining : groupRemainings) {
+ if (remaining.size() < groupQuorum) {
return AppendLogResult.TIME_OUT;
}
}
@@ -1123,20 +1151,21 @@ public class MetaGroupMember extends RaftMember {
@SuppressWarnings({"java:S2445", "java:S2274"})
// groupRemaining is shared with the handlers,
// and we do not wait infinitely to enable timeouts
- private int[] askGroupVotes(
+ private Set<Integer>[] askGroupVotes(
List<Node> nodeRing,
AppendEntryRequest request,
AtomicBoolean leaderShipStale,
Log log,
- AtomicLong newLeaderTerm) {
+ AtomicLong newLeaderTerm,
+ int quorumSize) {
// each node will be the header of a group, we use the node to represent the group
int nodeSize = nodeRing.size();
// the decreasing counters of how many nodes in a group has received the log, each time a
// node receive the log, the counters of all groups it is in will decrease by 1
- int[] groupRemainings = new int[nodeSize];
- // a group is considered successfully received the log if such members receive the log
- int groupQuorum = REPLICATION_NUM / 2 + 1;
- Arrays.fill(groupRemainings, groupQuorum);
+ Set<Integer>[] groupRemainings = new Set[nodeSize];
+ for (int i = 0; i < groupRemainings.length; i++) {
+ groupRemainings[i] = new HashSet<>();
+ }
synchronized (groupRemainings) {
// ask a vote from every node
@@ -1150,11 +1179,11 @@ public class MetaGroupMember extends RaftMember {
if (groupIndex < 0) {
groupIndex += groupRemainings.length;
}
- groupRemainings[groupIndex]--;
+ groupRemainings[groupIndex].add(thisNode.nodeIdentifier);
}
} else {
askRemoteGroupVote(
- node, groupRemainings, i, leaderShipStale, log, newLeaderTerm, request);
+ node, groupRemainings, i, leaderShipStale, log, newLeaderTerm, request, quorumSize);
}
}
@@ -1170,15 +1199,17 @@ public class MetaGroupMember extends RaftMember {
private void askRemoteGroupVote(
Node node,
- int[] groupRemainings,
+ Set<Integer>[] groupRemainings,
int nodeIndex,
AtomicBoolean leaderShipStale,
Log log,
AtomicLong newLeaderTerm,
- AppendEntryRequest request) {
+ AppendEntryRequest request,
+ int quorumSize) {
AppendGroupEntryHandler handler =
new AppendGroupEntryHandler(
- groupRemainings, nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
+ groupRemainings, nodeIndex, node, leaderShipStale, log, newLeaderTerm, this,
+ quorumSize);
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
try {
@@ -1224,7 +1255,9 @@ public class MetaGroupMember extends RaftMember {
}
}
- /** Load the partition table from a local file if it can be found. */
+ /**
+ * Load the partition table from a local file if it can be found.
+ */
private void loadPartitionTable() {
File partitionFile = new File(PARTITION_FILE_NAME);
if (!partitionFile.exists() && !recoverPartitionTableFile()) {
@@ -1329,7 +1362,9 @@ public class MetaGroupMember extends RaftMember {
thisNode.getInternalIp(), thisNode.getMetaPort(), System.currentTimeMillis());
}
- /** Set the node's identifier to "identifier", also save it to a local file in text format. */
+ /**
+ * Set the node's identifier to "identifier", also save it to a local file in text format.
+ */
private void setNodeIdentifier(int identifier) {
logger.info("The identifier of this node has been set to {}", identifier);
thisNode.setNodeIdentifier(identifier);
@@ -1435,7 +1470,7 @@ public class MetaGroupMember extends RaftMember {
* obtaining partition group based on path and intervals
*
* @param intervals time intervals, include minimum and maximum value
- * @param path partial path
+ * @param path partial path
* @return data partition on which the current interval of the current path is stored
* @throws StorageEngineException if Failed to get storage group path
*/
@@ -1634,7 +1669,7 @@ public class MetaGroupMember extends RaftMember {
case TIME_OUT:
logger.info("Removal request of {} timed out", target);
break;
- // retry
+ // retry
case LEADERSHIP_STALE:
default:
return Response.RESPONSE_NULL;
@@ -1761,9 +1796,9 @@ public class MetaGroupMember extends RaftMember {
/**
* Get a local DataGroupMember that is in the group of "header" and should process "request".
*
- * @param header the header of the group which the local node is in
+ * @param header the header of the group which the local node is in
* @param request the toString() of this parameter should explain what the request is and it is
- * only used in logs for tracing
+ * only used in logs for tracing
*/
public DataGroupMember getLocalDataMember(Node header, Object request) {
return dataClusterServer.getDataMember(header, null, request);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 666e989..f78e3d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,9 +19,31 @@
package org.apache.iotdb.cluster.server.member;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
@@ -39,6 +61,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
@@ -81,39 +104,11 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
-import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.ConcurrentModificationException;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* RaftMember process the common raft logic like leader election, log appending, catch-up and so
* on.
@@ -694,8 +689,7 @@ public abstract class RaftMember {
}
public void sendLogAsync(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
@@ -706,7 +700,7 @@ public abstract class RaftMember {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, peer
+ getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer
, quorumSize);
try {
if (indirectReceivers == null || indirectReceivers.isEmpty()) {
@@ -1143,12 +1137,12 @@ public abstract class RaftMember {
try {
AppendLogResult appendLogResult =
waitAppendResult(
- sendLogRequest.getVotedNodeIds(),
+ sendLogRequest.getVotingLog(),
sendLogRequest.getLeaderShipStale(),
sendLogRequest.getNewLeaderTerm(),
sendLogRequest.getQuorumSize());
Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
- sendLogRequest.getLog().getCreateTime());
+ sendLogRequest.getVotingLog().getLog().getCreateTime());
switch (appendLogResult) {
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
@@ -1171,7 +1165,7 @@ public abstract class RaftMember {
}
public SendLogRequest buildSendLogRequest(Log log) {
- Set<Integer> votedNodeIds = new HashSet<>(allNodes.size());
+ VotingLog votingLog = buildVotingLog(log);
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
@@ -1179,10 +1173,14 @@ public abstract class RaftMember {
AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
- return new SendLogRequest(log, votedNodeIds, leaderShipStale, newLeaderTerm,
+ return new SendLogRequest(votingLog, leaderShipStale, newLeaderTerm,
appendEntryRequest, allNodes.size() / 2);
}
+ protected VotingLog buildVotingLog(Log log) {
+ return new VotingLog(log, allNodes.size());
+ }
+
/**
* The maximum time to wait if there is no leader in the group, after which a
* LeadNotFoundException will be thrown.
@@ -1536,18 +1534,18 @@ public abstract class RaftMember {
*/
@SuppressWarnings({"java:S2445"}) // safe synchronized
private AppendLogResult waitAppendResult(
- Set<Integer> votedNodeIds, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm,
+ VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm,
int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
- synchronized (votedNodeIds) {
+ synchronized (log) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- while (votedNodeIds.size() >= quorumSize
+ while (log.getStronglyAcceptedNodeIds().size() >= quorumSize
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
- && !votedNodeIds.contains(Integer.MAX_VALUE)) {
+ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
try {
- votedNodeIds.wait(RaftServer.getWriteOperationTimeoutMS());
+ log.wait(RaftServer.getWriteOperationTimeoutMS());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
@@ -1568,7 +1566,7 @@ public abstract class RaftMember {
}
// cannot get enough agreements within a certain amount of time
- if (votedNodeIds.size() < quorumSize) {
+ if (log.getStronglyAcceptedNodeIds().size() < quorumSize) {
return AppendLogResult.TIME_OUT;
}
@@ -1786,11 +1784,11 @@ public abstract class RaftMember {
private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
if (requiredQuorum <= 0) {
// use half of the members' size as the quorum
- return sendLogToFollowers(log, new HashSet<>(), allNodes.size() / 2);
+ return sendLogToFollowers(buildVotingLog(log), allNodes.size() / 2);
} else {
// make sure quorum does not exceed the number of members - 1
return sendLogToFollowers(
- log, new HashSet<>(), Math.min(requiredQuorum, allNodes.size() - 1));
+ buildVotingLog(log), Math.min(requiredQuorum, allNodes.size() - 1));
}
}
@@ -1800,10 +1798,10 @@ public abstract class RaftMember {
* than the local term, retire from leader and return a LEADERSHIP_STALE. If "voteCounter" is
* still positive after a certain time, return TIME_OUT.
*
- * @param votedNodeIds a set of voted nodes' identifiers
+ * @param log a entry that is to be voted for
* @return an AppendLogResult indicating a success or a failure and why
*/
- private AppendLogResult sendLogToFollowers(Log log, Set<Integer> votedNodeIds, int quorumSize) {
+ private AppendLogResult sendLogToFollowers(VotingLog log, int quorumSize) {
if (allNodes.size() == 1) {
// single node group, does not need the agreement of others
return AppendLogResult.OK;
@@ -1815,7 +1813,7 @@ public abstract class RaftMember {
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
- AppendEntryRequest request = buildAppendEntryRequest(log, true);
+ AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
try {
if (allNodes.size() > 2) {
@@ -1825,7 +1823,7 @@ public abstract class RaftMember {
appendLogThreadPool.submit(
() ->
sendLogToFollower(
- log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, quorumSize));
+ log, node, leaderShipStale, newLeaderTerm, request, quorumSize));
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -1834,7 +1832,7 @@ public abstract class RaftMember {
// there is only one member, send to it within this thread to reduce thread switching
// overhead
for (Node node : allNodes) {
- sendLogToFollower(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, quorumSize);
+ sendLogToFollower(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -1846,18 +1844,17 @@ public abstract class RaftMember {
return AppendLogResult.TIME_OUT;
}
- return waitAppendResult(votedNodeIds, leaderShipStale, newLeaderTerm, quorumSize);
+ return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
}
public void sendLogToFollower(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
int quorumSize) {
- sendLogToFollower(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request,
+ sendLogToFollower(log, node, leaderShipStale, newLeaderTerm, request,
quorumSize, Collections.emptyList());
}
@@ -1865,8 +1862,7 @@ public abstract class RaftMember {
* Send "log" to "node".
*/
public void sendLogToFollower(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
@@ -1882,7 +1878,7 @@ public abstract class RaftMember {
*/
long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
Peer peer = peerMap.computeIfAbsent(node, k -> new Peer(logManager.getLastLogIndex()));
- if (!waitForPrevLog(peer, log)) {
+ if (!waitForPrevLog(peer, log.getLog())) {
logger.warn("{}: node {} timed out when appending {}", name, node, log);
return;
}
@@ -1893,10 +1889,10 @@ public abstract class RaftMember {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, peer,
+ sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer,
quorumSize, indirectReceivers);
} else {
- sendLogSync(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, peer,
+ sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer,
quorumSize, indirectReceivers);
}
}
@@ -1930,8 +1926,7 @@ public abstract class RaftMember {
}
private void sendLogSync(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
@@ -1940,7 +1935,7 @@ public abstract class RaftMember {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, peer
+ getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer
, quorumSize);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1964,8 +1959,7 @@ public abstract class RaftMember {
}
public AppendNodeEntryHandler getAppendNodeEntryHandler(
- Log log,
- Set<Integer> votedNodeIds,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
@@ -1973,7 +1967,6 @@ public abstract class RaftMember {
int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setReceiver(node);
- handler.setVotedNodeIds(votedNodeIds);
handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
handler.setMember(this);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index c5461dc..41eb023 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.common;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
@@ -207,8 +208,8 @@ public class TestAsyncDataClient extends AsyncDataClient {
public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {}
@Override
- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
- new Thread(() -> resultHandler.onComplete(BaseMember.dummyResponse.get())).start();
+ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ new Thread(() -> resultHandler.onComplete(new AppendEntryResult(BaseMember.dummyResponse.get()))).start();
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index 281e084..46b738e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -70,7 +71,7 @@ public class LogDispatcherTest {
return new TestAsyncClient() {
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(
() -> {
if (!downNode.contains(node)) {
@@ -86,7 +87,7 @@ public class LogDispatcherTest {
@Override
public void appendEntries(
- AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(
() -> {
if (!downNode.contains(node)) {
@@ -106,24 +107,24 @@ public class LogDispatcherTest {
public Client getSyncClient(Node node) {
return new TestSyncClient() {
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
try {
if (!downNode.contains(node)) {
return mockedAppendEntry(request);
}
- return -1;
+ return new AppendEntryResult(-1);
} catch (UnknownLogTypeException e) {
throw new TException(e);
}
}
@Override
- public long appendEntries(AppendEntriesRequest request) throws TException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws TException {
try {
if (!downNode.contains(node)) {
return mockedAppendEntries(request);
}
- return -1;
+ return new AppendEntryResult(-1);
} catch (UnknownLogTypeException e) {
throw new TException(e);
}
@@ -139,14 +140,14 @@ public class LogDispatcherTest {
raftMember.setCharacter(NodeCharacter.LEADER);
}
- private long mockedAppendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+ private AppendEntryResult mockedAppendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
LogParser logParser = LogParser.getINSTANCE();
Log parse = logParser.parse(request.entry.duplicate());
appendedEntries.computeIfAbsent(parse, p -> new AtomicInteger()).incrementAndGet();
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
- private long mockedAppendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
+ private AppendEntryResult mockedAppendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
List<ByteBuffer> entries = request.getEntries();
List<Log> logs = new ArrayList<>();
for (ByteBuffer entry : entries) {
@@ -157,7 +158,7 @@ public class LogDispatcherTest {
for (Log log : logs) {
appendedEntries.computeIfAbsent(log, p -> new AtomicInteger()).incrementAndGet();
}
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
@Test
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index 9e2f2cb..9f88ce3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -76,12 +77,12 @@ public class CatchUpTaskTest {
public Client getSyncClient(Node node) {
return new TestSyncClient() {
@Override
- public long appendEntry(AppendEntryRequest request) {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
return dummyAppendEntry(request);
}
@Override
- public long appendEntries(AppendEntriesRequest request) {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) {
return dummyAppendEntries(request);
}
@@ -107,13 +108,13 @@ public class CatchUpTaskTest {
return new TestAsyncClient() {
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(() -> resultHandler.onComplete(dummyAppendEntry(request))).start();
}
@Override
public void appendEntries(
- AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(() -> resultHandler.onComplete(dummyAppendEntries(request))).start();
}
@@ -137,38 +138,38 @@ public class CatchUpTaskTest {
}
};
- private long dummyAppendEntry(AppendEntryRequest request) {
+ private AppendEntryResult dummyAppendEntry(AppendEntryRequest request) {
Log log = receivedLogs.get(receivedLogs.size() - 1);
Log testLog;
try {
testLog = LogParser.getINSTANCE().parse(request.entry);
} catch (Exception e) {
- return Response.RESPONSE_NULL;
+ return new AppendEntryResult(Response.RESPONSE_NULL);
}
if (testLog.getCurrLogIndex() == log.getCurrLogIndex() + 1) {
leaderCommit = Math.max(request.leaderCommit, leaderCommit);
receivedLogs.add(testLog);
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
if (testLog.getCurrLogIndex() == log.getCurrLogIndex()) {
leaderCommit = Math.max(request.leaderCommit, leaderCommit);
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
- return Response.RESPONSE_LOG_MISMATCH;
+ return new AppendEntryResult(Response.RESPONSE_LOG_MISMATCH);
}
- private long dummyAppendEntries(AppendEntriesRequest request) {
+ private AppendEntryResult dummyAppendEntries(AppendEntriesRequest request) {
for (ByteBuffer byteBuffer : request.getEntries()) {
Log testLog;
try {
testLog = LogParser.getINSTANCE().parse(byteBuffer);
} catch (Exception e) {
- return Response.RESPONSE_NULL;
+ return new AppendEntryResult(Response.RESPONSE_NULL);
}
receivedLogs.add(testLog);
}
leaderCommit = Math.max(request.leaderCommit, leaderCommit);
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
private boolean dummyMatchTerm(long index, long term) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 13839ca..f6a368d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -74,7 +75,7 @@ public class LogCatchUpTaskTest {
return new TestAsyncClient() {
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(
() -> {
try {
@@ -88,7 +89,7 @@ public class LogCatchUpTaskTest {
@Override
public void appendEntries(
- AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(
() -> {
try {
@@ -106,7 +107,7 @@ public class LogCatchUpTaskTest {
public Client getSyncClient(Node node) {
return new TestSyncClient() {
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
try {
return dummyAppendEntry(request);
} catch (UnknownLogTypeException e) {
@@ -115,7 +116,7 @@ public class LogCatchUpTaskTest {
}
@Override
- public long appendEntries(AppendEntriesRequest request) throws TException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws TException {
try {
return dummyAppendEntries(request);
} catch (UnknownLogTypeException e) {
@@ -131,17 +132,17 @@ public class LogCatchUpTaskTest {
}
};
- private long dummyAppendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+ private AppendEntryResult dummyAppendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
LogParser parser = LogParser.getINSTANCE();
Log testLog = parser.parse(request.entry);
receivedLogs.add(testLog);
if (testLeadershipFlag && testLog.getCurrLogIndex() == 4) {
sender.setCharacter(NodeCharacter.ELECTOR);
}
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
- private long dummyAppendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
+ private AppendEntryResult dummyAppendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
LogParser parser = LogParser.getINSTANCE();
Log testLog;
for (ByteBuffer byteBuffer : request.getEntries()) {
@@ -149,11 +150,11 @@ public class LogCatchUpTaskTest {
receivedLogs.add(testLog);
if (testLog != null && testLeadershipFlag && testLog.getCurrLogIndex() >= 1023) {
// return a larger term to indicate that the leader has changed
- return sender.getTerm().get() + 1;
+ return new AppendEntryResult(sender.getTerm().get() + 1);
}
}
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
@Before
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index 52ab7a4..4af2ee2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -78,7 +79,7 @@ public class SnapshotCatchUpTaskTest {
return new TestAsyncClient() {
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(() -> resultHandler.onComplete(dummyAppendEntry(request))).start();
}
@@ -102,7 +103,7 @@ public class SnapshotCatchUpTaskTest {
}
return new TestSyncClient() {
@Override
- public long appendEntry(AppendEntryRequest request) {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
return dummyAppendEntry(request);
}
@@ -119,11 +120,11 @@ public class SnapshotCatchUpTaskTest {
}
};
- private long dummyAppendEntry(AppendEntryRequest request) {
+ private AppendEntryResult dummyAppendEntry(AppendEntryRequest request) {
TestLog testLog = new TestLog();
testLog.deserialize(request.entry);
receivedLogs.add(testLog);
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
private void dummySendSnapshot(SendSnapshotRequest request) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
index 7584fd7..72dd990 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
@@ -19,16 +19,20 @@
package org.apache.iotdb.cluster.server.handlers.caller;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.iotdb.cluster.common.TestException;
import org.apache.iotdb.cluster.common.TestLog;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.checkerframework.checker.units.qual.A;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -65,9 +69,9 @@ public class AppendGroupEntryHandlerTest {
@Test
public void testAgreement() throws InterruptedException {
- int[] groupReceivedCounter = new int[10];
+ Set<Integer>[] groupReceivedCounter = new Set[10];
for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = REPLICATION_NUM / 2;
+ groupReceivedCounter[i] = new HashSet<>();
}
AtomicBoolean leadershipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(-1);
@@ -82,8 +86,9 @@ public class AppendGroupEntryHandlerTest {
leadershipStale,
testLog,
newLeaderTerm,
- member);
- new Thread(() -> handler.onComplete(Response.RESPONSE_AGREE)).start();
+ member,
+ REPLICATION_NUM / 2);
+ new Thread(() -> handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE))).start();
}
groupReceivedCounter.wait();
}
@@ -96,9 +101,9 @@ public class AppendGroupEntryHandlerTest {
@Test
public void testNoAgreement() throws InterruptedException {
- int[] groupReceivedCounter = new int[10];
+ Set<Integer>[] groupReceivedCounter = new Set[10];
for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = REPLICATION_NUM;
+ groupReceivedCounter[i] = new HashSet<>();
}
AtomicBoolean leadershipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(-1);
@@ -113,8 +118,9 @@ public class AppendGroupEntryHandlerTest {
leadershipStale,
testLog,
newLeaderTerm,
- member);
- handler.onComplete(Response.RESPONSE_AGREE);
+ member,
+ REPLICATION_NUM / 2);
+ handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE));
}
}
for (int i = 0; i < 10; i++) {
@@ -130,9 +136,9 @@ public class AppendGroupEntryHandlerTest {
@Test
public void testLeadershipStale() throws InterruptedException {
- int[] groupReceivedCounter = new int[10];
+ Set<Integer>[] groupReceivedCounter = new Set[10];
for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = REPLICATION_NUM / 2;
+ groupReceivedCounter[i] = new HashSet<>();
}
AtomicBoolean leadershipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(-1);
@@ -146,8 +152,9 @@ public class AppendGroupEntryHandlerTest {
leadershipStale,
testLog,
newLeaderTerm,
- member);
- new Thread(() -> handler.onComplete(100L)).start();
+ member,
+ REPLICATION_NUM / 2);
+ new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
groupReceivedCounter.wait();
}
for (int i = 0; i < 10; i++) {
@@ -159,9 +166,9 @@ public class AppendGroupEntryHandlerTest {
@Test
public void testError() throws InterruptedException {
- int[] groupReceivedCounter = new int[10];
+ Set<Integer>[] groupReceivedCounter = new Set[10];
for (int i = 0; i < 10; i++) {
- groupReceivedCounter[i] = REPLICATION_NUM / 2;
+ groupReceivedCounter[i] = new HashSet<>();
}
AtomicBoolean leadershipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(-1);
@@ -175,7 +182,8 @@ public class AppendGroupEntryHandlerTest {
leadershipStale,
testLog,
newLeaderTerm,
- member);
+ member,
+ REPLICATION_NUM / 2);
handler.onError(new TestException());
for (int i = 0; i < 10; i++) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 39a0457..7fcfcc2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -19,30 +19,29 @@
package org.apache.iotdb.cluster.server.handlers.caller;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.common.TestException;
import org.apache.iotdb.cluster.common.TestLog;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.VotingLog;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class AppendNodeEntryHandlerTest {
private RaftMember member;
@@ -68,26 +67,27 @@ public class AppendNodeEntryHandlerTest {
int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
try {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
- AtomicInteger quorum = new AtomicInteger(5);
+ VotingLog votingLog = new VotingLog(log, 10);
Peer peer = new Peer(1);
- synchronized (quorum) {
+ synchronized (votingLog) {
for (int i = 0; i < 10; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVotedNodeIds(quorum);
- handler.setLog(log);
+ handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(i));
handler.setPeer(peer);
long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
- new Thread(() -> handler.onComplete(resp)).start();
+ AppendEntryResult result = new AppendEntryResult();
+ result.setStatus(resp);
+ new Thread(() -> handler.onComplete(result)).start();
}
- quorum.wait();
+ votingLog.wait();
}
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
- assertEquals(0, quorum.get());
+ assertEquals(5, votingLog.getStronglyAcceptedNodeIds().size());
} finally {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(replicationNum);
}
@@ -98,24 +98,25 @@ public class AppendNodeEntryHandlerTest {
AtomicLong receiverTerm = new AtomicLong(-1);
AtomicBoolean leadershipStale = new AtomicBoolean(false);
Log log = new TestLog();
- AtomicInteger quorum = new AtomicInteger(5);
+ VotingLog votingLog = new VotingLog(log, 10);
Peer peer = new Peer(1);
for (int i = 0; i < 3; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVotedNodeIds(quorum);
- handler.setLog(log);
+ handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(i));
handler.setPeer(peer);
- handler.onComplete(Response.RESPONSE_AGREE);
+ AppendEntryResult result = new AppendEntryResult();
+ result.setStatus(Response.RESPONSE_AGREE);
+ handler.onComplete(result);
}
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
- assertEquals(2, quorum.get());
+ assertEquals(3, votingLog.getStronglyAcceptedNodeIds().size());
}
@Test
@@ -123,24 +124,23 @@ public class AppendNodeEntryHandlerTest {
AtomicLong receiverTerm = new AtomicLong(-1);
AtomicBoolean leadershipStale = new AtomicBoolean(false);
Log log = new TestLog();
- AtomicInteger quorum = new AtomicInteger(5);
+ VotingLog votingLog = new VotingLog(log, 10);
Peer peer = new Peer(1);
- synchronized (quorum) {
+ synchronized (votingLog) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVotedNodeIds(quorum);
- handler.setLog(log);
+ handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(0));
handler.setPeer(peer);
- new Thread(() -> handler.onComplete(100L)).start();
- quorum.wait();
+ new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
+ votingLog.wait();
}
assertEquals(100, receiverTerm.get());
assertTrue(leadershipStale.get());
- assertEquals(5, quorum.get());
+ assertEquals(0, votingLog.getStronglyAcceptedNodeIds().size());
}
@Test
@@ -151,13 +151,12 @@ public class AppendNodeEntryHandlerTest {
int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
try {
- AtomicInteger quorum = new AtomicInteger(5);
+ VotingLog votingLog = new VotingLog(log, 10);
Peer peer = new Peer(1);
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVotedNodeIds(quorum);
- handler.setLog(log);
+ handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
handler.setReceiver(TestUtils.getNode(0));
@@ -166,7 +165,7 @@ public class AppendNodeEntryHandlerTest {
assertEquals(-1, receiverTerm.get());
assertFalse(leadershipStale.get());
- assertEquals(5, quorum.get());
+ assertEquals(0, votingLog.getStronglyAcceptedNodeIds().size());
} finally {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(replicationNum);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
index acbab1b..5241326 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.common.TestLog;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -67,7 +68,7 @@ public class LogCatchUpHandlerTest {
handler.setLog(log);
handler.setRaftMember(member);
synchronized (appendSucceed) {
- new Thread(() -> handler.onComplete(Response.RESPONSE_AGREE)).start();
+ new Thread(() -> handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE))).start();
appendSucceed.wait();
}
assertTrue(appendSucceed.get());
@@ -84,7 +85,7 @@ public class LogCatchUpHandlerTest {
handler.setLog(log);
handler.setRaftMember(member);
synchronized (appendSucceed) {
- new Thread(() -> handler.onComplete(Response.RESPONSE_LOG_MISMATCH)).start();
+ new Thread(() -> handler.onComplete(new AppendEntryResult(Response.RESPONSE_LOG_MISMATCH))).start();
appendSucceed.wait();
}
assertTrue(appendSucceed.get());
@@ -101,7 +102,7 @@ public class LogCatchUpHandlerTest {
handler.setLog(log);
handler.setRaftMember(member);
synchronized (appendSucceed) {
- new Thread(() -> handler.onComplete(100L)).start();
+ new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
appendSucceed.wait();
}
assertFalse(appendSucceed.get());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 90cfeab..2c84ca1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -410,13 +411,13 @@ public class MetaGroupMemberTest extends BaseMember {
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
new Thread(
() -> {
long resp = dummyResponse.get();
// MIN_VALUE means let the request time out
if (resp != Long.MIN_VALUE) {
- resultHandler.onComplete(dummyResponse.get());
+ resultHandler.onComplete(new AppendEntryResult(resp));
}
})
.start();