You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/08 12:53:16 UTC
[iotdb] branch expr updated: change response of appending entries
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new 8a08f17 change response of appending entries
8a08f17 is described below
commit 8a08f17d445c2e08325ed3a25a8b0c6cad276ae8
Author: jt <jt...@163.com>
AuthorDate: Tue Jun 8 20:52:22 2021 +0800
change response of appending entries
---
.../iotdb/cluster/expr/ExprLogDispatcher.java | 330 ---------------------
.../org/apache/iotdb/cluster/expr/ExprMember.java | 25 +-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 2 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 71 +++--
.../iotdb/cluster/server/DataClusterServer.java | 38 ++-
.../iotdb/cluster/server/MetaClusterServer.java | 20 +-
.../org/apache/iotdb/cluster/server/Response.java | 4 +-
.../handlers/caller/AppendNodeEntryHandler.java | 61 ++--
.../handlers/forwarder/IndirectAppendHandler.java | 5 +-
.../iotdb/cluster/server/member/RaftMember.java | 158 ++++++----
.../cluster/server/service/BaseAsyncService.java | 20 +-
.../cluster/server/service/BaseSyncService.java | 10 +-
.../cluster/server/service/MetaSyncService.java | 5 +-
.../caller/AppendNodeEntryHandlerTest.java | 8 +-
.../iotdb/cluster/server/member/BaseMember.java | 5 +-
.../cluster/server/member/RaftMemberTest.java | 9 +-
thrift-cluster/src/main/thrift/cluster.thrift | 14 +-
17 files changed, 257 insertions(+), 528 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
deleted file mode 100644
index 4b302a7..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.expr;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
-import org.apache.iotdb.cluster.server.monitor.Timer;
-import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
- * followers and send the logs in an ordered manner so that the followers will not wait for previous
- * logs for too long. For example: if the leader send 3 logs, log1, log2, log3, concurrently to
- * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3
- * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly.
- */
-public class ExprLogDispatcher {
-
- private static final Logger logger = LoggerFactory.getLogger(ExprLogDispatcher.class);
- private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
- private ExecutorService executorService;
- private static ExecutorService serializationService =
- Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
- private SyncClientPool clientPool;
- private Node leader;
-
- public ExprLogDispatcher(List<Node> nodes, Node leader) {
- executorService = Executors.newCachedThreadPool();
- for (Node node : nodes) {
- nodeLogQueues.add(createQueueAndBindingThread(node));
- }
- clientPool = new SyncClientPool(new FactorySync(new Factory()));
- this.leader = leader;
- }
-
- @TestOnly
- public void close() throws InterruptedException {
- executorService.shutdownNow();
- executorService.awaitTermination(10, TimeUnit.SECONDS);
- }
-
- public void offer(SendLogRequest log) {
- // do serialization here to avoid taking LogManager for too long
- if (!nodeLogQueues.isEmpty()) {
- log.serializedLogFuture =
- serializationService.submit(
- () -> {
- ByteBuffer byteBuffer = log.getLog().serialize();
- log.getLog().setByteSize(byteBuffer.array().length);
- return byteBuffer;
- });
- }
- for (int i = 0; i < nodeLogQueues.size(); i++) {
- BlockingQueue<SendLogRequest> nodeLogQueue = nodeLogQueues.get(i);
- try {
- boolean addSucceeded;
- if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
- addSucceeded =
- nodeLogQueue.offer(
- log,
- ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
- TimeUnit.MILLISECONDS);
- } else {
- addSucceeded = nodeLogQueue.add(log);
- }
-
- if (addSucceeded) {
- log.setEnqueueTime(System.nanoTime());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
- BlockingQueue<SendLogRequest> logBlockingQueue =
- new ArrayBlockingQueue<>(
- ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
- int bindingThreadNum = 1;
- for (int i = 0; i < bindingThreadNum; i++) {
- executorService.submit(new DispatcherThread(node, logBlockingQueue));
- }
- return logBlockingQueue;
- }
-
- public static class SendLogRequest {
-
- private Log log;
- private AtomicInteger voteCounter;
- private AtomicBoolean leaderShipStale;
- private AtomicLong newLeaderTerm;
- private AppendEntryRequest appendEntryRequest;
- private long enqueueTime;
- private Future<ByteBuffer> serializedLogFuture;
-
- public SendLogRequest(
- Log log,
- AtomicInteger voteCounter,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest appendEntryRequest) {
- this.setLog(log);
- this.setVoteCounter(voteCounter);
- this.setLeaderShipStale(leaderShipStale);
- this.setNewLeaderTerm(newLeaderTerm);
- this.setAppendEntryRequest(appendEntryRequest);
- }
-
- public AtomicInteger getVoteCounter() {
- return voteCounter;
- }
-
- public void setVoteCounter(AtomicInteger voteCounter) {
- this.voteCounter = voteCounter;
- }
-
- public Log getLog() {
- return log;
- }
-
- public void setLog(Log log) {
- this.log = log;
- }
-
- public long getEnqueueTime() {
- return enqueueTime;
- }
-
- public void setEnqueueTime(long enqueueTime) {
- this.enqueueTime = enqueueTime;
- }
-
- public AtomicBoolean getLeaderShipStale() {
- return leaderShipStale;
- }
-
- public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
- this.leaderShipStale = leaderShipStale;
- }
-
- public AtomicLong getNewLeaderTerm() {
- return newLeaderTerm;
- }
-
- void setNewLeaderTerm(AtomicLong newLeaderTerm) {
- this.newLeaderTerm = newLeaderTerm;
- }
-
- public AppendEntryRequest getAppendEntryRequest() {
- return appendEntryRequest;
- }
-
- public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
- this.appendEntryRequest = appendEntryRequest;
- }
-
- @Override
- public String toString() {
- return "SendLogRequest{" + "log=" + log + '}';
- }
- }
-
- class DispatcherThread implements Runnable {
-
- private Node node;
- private Client client;
- private BlockingQueue<SendLogRequest> logBlockingDeque;
- private List<SendLogRequest> currBatch = new ArrayList<>();
-
- DispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingDeque) {
- this.client = clientPool.getClient(node);
- this.logBlockingDeque = logBlockingDeque;
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("LogDispatcher-" + node);
- try {
- while (!Thread.interrupted()) {
- SendLogRequest poll = logBlockingDeque.take();
- currBatch.add(poll);
- logBlockingDeque.drainTo(currBatch);
- if (logger.isDebugEnabled()) {
- logger.debug("Sending {} logs to {}", currBatch.size(), node);
- }
- for (SendLogRequest request : currBatch) {
- request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
- }
- sendBatchLogs(currBatch);
- currBatch.clear();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- logger.error("Unexpected error in log dispatcher", e);
- }
- logger.info("Dispatcher exits");
- }
-
- private void appendEntriesSync(
- List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) {
-
- long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
-
- startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
- try {
- long result = client.appendEntries(request);
- Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- if (result != -1 && logger.isInfoEnabled()) {
- logger.info(
- "Append {} logs to {}, resp: {}",
- logList.size(),
- node,
- result);
- }
- } catch (TException e) {
- logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
- }
- }
-
- private AppendEntriesRequest prepareRequest(
- List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
- AppendEntriesRequest request = new AppendEntriesRequest();
-
- request.setLeader(leader);
- request.setTerm(1);
-
- request.setEntries(logList);
- // set index for raft
- request.setPrevLogIndex(currBatch.get(firstIndex).getLog().getCurrLogIndex() - 1);
- try {
- request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
- return request;
- }
-
- private void sendLogs(List<SendLogRequest> currBatch) {
- int logIndex = 0;
- logger.debug(
- "send logs from index {} to {}",
- currBatch.get(0).getLog().getCurrLogIndex(),
- currBatch.get(currBatch.size() - 1).getLog().getCurrLogIndex());
- while (logIndex < currBatch.size()) {
- long logSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
- List<ByteBuffer> logList = new ArrayList<>();
- int prevIndex = logIndex;
-
- for (; logIndex < currBatch.size(); logIndex++) {
- long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length;
- if (logSize - curSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
- break;
- }
- logSize -= curSize;
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- currBatch.get(logIndex).getLog().getCreateTime());
- logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
- }
-
- AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
- appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
- for (; prevIndex < logIndex; prevIndex++) {
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- currBatch.get(prevIndex).getLog().getCreateTime());
- }
- }
- }
-
- private void sendBatchLogs(List<SendLogRequest> currBatch) {
- sendLogs(currBatch);
- }
-
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 0ad1ebb..0e4c8c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -102,18 +103,21 @@ public class ExprMember extends MetaGroupMember {
return processNonPartitionedMetaPlan(plan);
}
- protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ protected AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
if (!useSlidingWindow) {
return super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
}
- long resp;
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long success = 0;
+ AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1;
if (windowPos < 0) {
success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
} else if (windowPos < windowSize) {
logWindow[(int) windowPos] = log;
if (windowPos == 0) {
@@ -135,23 +139,24 @@ public class ExprMember extends MetaGroupMember {
for (int i = 1; i <= flushPos; i++) {
logWindow[windowSize - i] = null;
}
- } else {
- System.out.println("not success");
}
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ } else {
+ result.status = Response.RESPONSE_WEAK_ACCEPT;
}
} else {
- return Response.RESPONSE_LOG_MISMATCH;
+ return new AppendEntryResult(Response.RESPONSE_LOG_MISMATCH);
}
}
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (success != -1) {
- resp = Response.RESPONSE_AGREE;
- } else {
+ if (success == -1) {
// the incoming log points to an illegal position, reject it
- resp = Response.RESPONSE_LOG_MISMATCH;
+ result.status = Response.RESPONSE_LOG_MISMATCH;
}
- return resp;
+ return result;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 71aa631..ff6bd96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -103,7 +103,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
logRequest.getLog().getCreateTime());
member.sendLogToFollower(
logRequest.getLog(),
- logRequest.getVoteCounter(),
+ logRequest.getVotedNodeIds(),
receiver,
logRequest.getLeaderShipStale(),
logRequest.getNewLeaderTerm(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ae50b7c..b0dd31c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.cluster.log;
+import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -152,32 +154,35 @@ public class LogDispatcher {
public static class SendLogRequest {
private Log log;
- private AtomicInteger voteCounter;
+ private Set<Integer> votedNodeIds;
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
private AppendEntryRequest appendEntryRequest;
private long enqueueTime;
private Future<ByteBuffer> serializedLogFuture;
+ private int quorumSize;
public SendLogRequest(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
- AppendEntryRequest appendEntryRequest) {
+ AppendEntryRequest appendEntryRequest,
+ int quorumSize) {
this.setLog(log);
- this.setVoteCounter(voteCounter);
+ this.setVotedNodeIds(votedNodeIds);
this.setLeaderShipStale(leaderShipStale);
this.setNewLeaderTerm(newLeaderTerm);
this.setAppendEntryRequest(appendEntryRequest);
+ this.setQuorumSize(quorumSize);
}
- public AtomicInteger getVoteCounter() {
- return voteCounter;
+ public Set<Integer> getVotedNodeIds() {
+ return votedNodeIds;
}
- public void setVoteCounter(AtomicInteger voteCounter) {
- this.voteCounter = voteCounter;
+ public void setVotedNodeIds(Set<Integer> votedNodeIds) {
+ this.votedNodeIds = votedNodeIds;
}
public Log getLog() {
@@ -220,6 +225,14 @@ public class LogDispatcher {
this.appendEntryRequest = appendEntryRequest;
}
+ public int getQuorumSize() {
+ return quorumSize;
+ }
+
+ public void setQuorumSize(int quorumSize) {
+ this.quorumSize = quorumSize;
+ }
+
@Override
public String toString() {
return "SendLogRequest{" + "log=" + log + '}';
@@ -272,7 +285,7 @@ public class LogDispatcher {
private void appendEntriesAsync(
List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
throws TException {
- AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
+ AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
AsyncClient client = member.getSendLogAsyncClient(receiver);
if (logger.isDebugEnabled()) {
logger.debug(
@@ -302,19 +315,11 @@ public class LogDispatcher {
logger.error("No available client for {}", receiver);
return;
}
- AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
+ AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
try {
- long result = client.appendEntries(request);
+ AppendEntryResult result = client.appendEntries(request);
Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- if (result != -1 && logger.isInfoEnabled()) {
- logger.info(
- "{}: Append {} logs to {}, resp: {}",
- member.getName(),
- logList.size(),
- receiver,
- result);
- }
handler.onComplete(result);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
@@ -404,18 +409,19 @@ public class LogDispatcher {
logRequest.getLog().getCreateTime());
member.sendLogToFollower(
logRequest.getLog(),
- logRequest.getVoteCounter(),
+ logRequest.getVotedNodeIds(),
receiver,
logRequest.getLeaderShipStale(),
logRequest.getNewLeaderTerm(),
- logRequest.getAppendEntryRequest());
+ logRequest.getAppendEntryRequest(),
+ logRequest.getQuorumSize());
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
logRequest.getLog().getCreateTime());
}
- class AppendEntriesHandler implements AsyncMethodCallback<Long> {
+ class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
- private final List<AsyncMethodCallback<Long>> singleEntryHandlers;
+ private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
private AppendEntriesHandler(List<SendLogRequest> batch) {
singleEntryHandlers = new ArrayList<>(batch.size());
@@ -423,44 +429,47 @@ public class LogDispatcher {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(
sendLogRequest.getLog(),
- sendLogRequest.getVoteCounter(),
+ sendLogRequest.getVotedNodeIds(),
receiver,
sendLogRequest.getLeaderShipStale(),
sendLogRequest.getNewLeaderTerm(),
- peer);
+ peer,
+ sendLogRequest.getQuorumSize());
singleEntryHandlers.add(handler);
}
}
@Override
- public void onComplete(Long aLong) {
- for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
+ public void onComplete(AppendEntryResult aLong) {
+ for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
singleEntryHandler.onComplete(aLong);
}
}
@Override
public void onError(Exception e) {
- for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
+ for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
singleEntryHandler.onError(e);
}
}
private AppendNodeEntryHandler getAppendNodeEntryHandler(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> voteCounter,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
- Peer peer) {
+ Peer peer,
+ int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setReceiver(node);
- handler.setVoteCounter(voteCounter);
+ handler.setVotedNodeIds(voteCounter);
handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
handler.setMember(member);
handler.setPeer(peer);
handler.setReceiverTerm(newLeaderTerm);
+ handler.setQuorumSize(quorumSize);
return handler;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 69469b0..6e07555 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -19,6 +19,15 @@
package org.apache.iotdb.cluster.server;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
@@ -30,8 +39,8 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
@@ -59,7 +68,6 @@ import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.DataSyncService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -70,16 +78,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
public class DataClusterServer extends RaftServer
implements TSDataService.AsyncIface, TSDataService.Iface {
@@ -262,7 +260,7 @@ public class DataClusterServer extends RaftServer
}
@Override
- public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
@@ -271,7 +269,7 @@ public class DataClusterServer extends RaftServer
}
@Override
- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
Node header = request.getHeader();
DataAsyncService service = getDataAsyncService(header, resultHandler, request);
if (service != null) {
@@ -902,12 +900,12 @@ public class DataClusterServer extends RaftServer
}
@Override
- public long appendEntries(AppendEntriesRequest request) throws TException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws TException {
return getDataSyncService(request.getHeader()).appendEntries(request);
}
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
return getDataSyncService(request.getHeader()).appendEntry(request);
}
@@ -966,26 +964,26 @@ public class DataClusterServer extends RaftServer
}
@Override
- public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+ public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
throws TException {
return getDataSyncService(thisNode)
.appendEntryIndirect(request, subReceivers);
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+ public void acknowledgeAppendEntry(AppendEntryResult ack) {
getDataSyncService(thisNode).acknowledgeAppendEntry(ack);
}
@Override
public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
- AsyncMethodCallback<Long> resultHandler) {
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
getDataAsyncService(thisNode, resultHandler, request)
.appendEntryIndirect(request, subReceivers, resultHandler);
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+ public void acknowledgeAppendEntry(AppendEntryResult ack,
AsyncMethodCallback<Void> resultHandler) {
getDataAsyncService(thisNode, resultHandler, ack)
.acknowledgeAppendEntry(ack, resultHandler);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index f18d920..081e722 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.cluster.server;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
@@ -27,8 +29,8 @@ import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -52,7 +54,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -63,9 +64,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-
/**
* MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
* data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is
@@ -319,12 +317,12 @@ public class MetaClusterServer extends RaftServer
}
@Override
- public long appendEntries(AppendEntriesRequest request) throws TException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws TException {
return syncService.appendEntries(request);
}
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
return syncService.appendEntry(request);
}
@@ -374,24 +372,24 @@ public class MetaClusterServer extends RaftServer
}
@Override
- public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+ public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
throws TException {
return syncService.appendEntryIndirect(request, subReceivers);
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+ public void acknowledgeAppendEntry(AppendEntryResult ack) {
syncService.acknowledgeAppendEntry(ack);
}
@Override
public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
- AsyncMethodCallback<Long> resultHandler) {
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
asyncService.appendEntryIndirect(request, subReceivers, resultHandler);
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+ public void acknowledgeAppendEntry(AppendEntryResult ack,
AsyncMethodCallback<Void> resultHandler) {
asyncService.acknowledgeAppendEntry(ack, resultHandler);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 523fb0d..36a06ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -49,8 +49,10 @@ public class Response {
// the new node, which tries to join the cluster, contains conflicted parameters with the
// cluster, so the operation is rejected.
public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10;
+ public static final long RESPONSE_STRONG_ACCEPT = -11;
+ public static final long RESPONSE_WEAK_ACCEPT = -12;
// the request is not executed locally anc should be forwarded
- public static final long RESPONSE_NULL = Long.MIN_VALUE;
+ public static final int RESPONSE_NULL = Integer.MIN_VALUE;
private Response() {
// enum-like class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index fa744d1..b233d20 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -18,42 +18,43 @@
*/
package org.apache.iotdb.cluster.server.handlers.caller;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+
+import java.net.ConnectException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
-
/**
* AppendNodeEntryHandler checks if the log is successfully appended by the quorum or some node has
* rejected it for some reason when one node has finished the AppendEntryRequest. The target of the
* log is the single nodes, it requires the agreement from the quorum of the nodes to reach
* consistency.
*/
-public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
+public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryResult> {
private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
private RaftMember member;
private AtomicLong receiverTerm;
private Log log;
- private AtomicInteger voteCounter;
+ private Set<Integer> votedNodeIds;
private AtomicBoolean leaderShipStale;
private Node receiver;
private Peer peer;
+ private int quorumSize;
// initialized as the quorum size, and decrease by 1 each time when we receive a rejection or
// an exception, upon decreased to zero, the request will be early-aborted
private int failedDecreasingCounter;
@@ -69,11 +70,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
}
@Override
- public void onComplete(Long response) {
+ public void onComplete(AppendEntryResult response) {
if (Timer.ENABLE_INSTRUMENTING) {
Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
}
- if (voteCounter.get() == Integer.MAX_VALUE) {
+ if (votedNodeIds.contains(Integer.MAX_VALUE)) {
// the request already failed
return;
}
@@ -82,10 +83,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
// someone has rejected this log because the leadership is stale
return;
}
- long resp = response;
- synchronized (voteCounter) {
- if (resp == RESPONSE_AGREE) {
- int remaining = voteCounter.decrementAndGet();
+ long resp = response.status;
+ synchronized (votedNodeIds) {
+ if (resp == RESPONSE_STRONG_ACCEPT) {
+ votedNodeIds.add(receiver.nodeIdentifier);
+ int remaining = quorumSize - votedNodeIds.size();
logger.debug(
"{}: Received an agreement from {} for {}, remaining votes to succeed: {}",
member.getName(),
@@ -98,7 +100,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
member.getName(),
log.getCurrLogIndex(),
log);
- voteCounter.notifyAll();
+ votedNodeIds.notifyAll();
}
peer.setMatchIndex(Math.max(log.getCurrLogIndex(), peer.getMatchIndex()));
} else if (resp > 0) {
@@ -115,7 +117,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
receiverTerm.set(resp);
}
leaderShipStale.set(true);
- voteCounter.notifyAll();
+ votedNodeIds.notifyAll();
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
logger.debug(
@@ -143,12 +145,12 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
}
private void onFail() {
- synchronized (voteCounter) {
+ synchronized (votedNodeIds) {
failedDecreasingCounter--;
if (failedDecreasingCounter <= 0) {
// quorum members have failed, there is no need to wait for others
- voteCounter.set(Integer.MAX_VALUE);
- voteCounter.notifyAll();
+ votedNodeIds.add(Integer.MAX_VALUE);
+ votedNodeIds.notifyAll();
}
}
}
@@ -161,10 +163,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
this.member = member;
}
- public void setVoteCounter(AtomicInteger voteCounter) {
- this.voteCounter = voteCounter;
- this.failedDecreasingCounter =
- ClusterDescriptor.getInstance().getConfig().getReplicationNum() - voteCounter.get();
+ public void setVotedNodeIds(Set<Integer> votedNodeIds) {
+ this.votedNodeIds = votedNodeIds;
+
}
public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
@@ -182,4 +183,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
public void setReceiverTerm(AtomicLong receiverTerm) {
this.receiverTerm = receiverTerm;
}
+
+ public int getQuorumSize() {
+ return quorumSize;
+ }
+
+ public void setQuorumSize(int quorumSize) {
+ this.quorumSize = quorumSize;
+ this.failedDecreasingCounter =
+ ClusterDescriptor.getInstance().getConfig().getReplicationNum() - quorumSize;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
index 71903d4..138a44e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.cluster.server.handlers.forwarder;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IndirectAppendHandler implements AsyncMethodCallback<Long> {
+public class IndirectAppendHandler implements AsyncMethodCallback<AppendEntryResult> {
private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
private Node receiver;
@@ -38,7 +39,7 @@ public class IndirectAppendHandler implements AsyncMethodCallback<Long> {
}
@Override
- public void onComplete(Long response) {
+ public void onComplete(AppendEntryResult response) {
// ignore response from indirect appender
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5b248b8..666e989 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.cluster.server.member;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
@@ -41,8 +43,8 @@ import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
@@ -543,12 +545,12 @@ public abstract class RaftMember {
* Process an AppendEntryRequest. First check the term of the leader, then parse the log and
* finally see if we can find a position to append the log.
*/
- public long appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntryRequest: {}", name, request);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
if (checkResult != Response.RESPONSE_AGREE) {
- return checkResult;
+ return new AppendEntryResult(checkResult);
}
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
@@ -557,30 +559,32 @@ public abstract class RaftMember {
log.setByteSize(logByteSize);
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
- logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result);
+ AppendEntryResult result = appendEntry(request.prevLogIndex, request.prevLogTerm,
+ request.leaderCommit, log);
+
+ logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
if (!request.isFromLeader) {
- appendAckLeader(request.leader, log, result);
+ appendAckLeader(request.leader, log, result.status);
}
return result;
}
private void appendAckLeader(Node leader, Log log, long response) {
- AppendEntryAcknowledgement appendEntryAcknowledgement = new AppendEntryAcknowledgement();
- appendEntryAcknowledgement.index = log.getCurrLogIndex();
- appendEntryAcknowledgement.term = log.getCurrLogTerm();
- appendEntryAcknowledgement.response = response;
+ AppendEntryResult result = new AppendEntryResult();
+ result.setLastLogIndex(log.getCurrLogIndex());
+ result.setLastLogTerm(log.getCurrLogTerm());
+ result.status = response;
Client syncClient = null;
try {
if (config.isUseAsyncServer()) {
GenericHandler<Void> handler = new GenericHandler<>(leader, null);
- getAsyncClient(leader).acknowledgeAppendEntry(appendEntryAcknowledgement, handler);
+ getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
} else {
syncClient = getSyncClient(leader);
- syncClient.acknowledgeAppendEntry(appendEntryAcknowledgement);
+ syncClient.acknowledgeAppendEntry(result);
}
} catch (TException e) {
logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
@@ -591,8 +595,9 @@ public abstract class RaftMember {
}
}
- public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException {
- long result = appendEntry(request);
+ public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers)
+ throws UnknownLogTypeException {
+ AppendEntryResult result = appendEntry(request);
request.entry.rewind();
appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers));
return result;
@@ -623,16 +628,16 @@ public abstract class RaftMember {
/**
* Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
*/
- public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
if (checkResult != Response.RESPONSE_AGREE) {
- return checkResult;
+ return new AppendEntryResult(checkResult);
}
- long response;
+ AppendEntryResult response;
List<Log> logs = new ArrayList<>();
int logByteSize = 0;
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
@@ -690,16 +695,19 @@ public abstract class RaftMember {
public void sendLogAsync(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer, List<Node> indirectReceivers) {
+ Peer peer,
+ int quorumSize,
+ List<Node> indirectReceivers) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
+ getAppendNodeEntryHandler(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, peer
+ , quorumSize);
try {
if (indirectReceivers == null || indirectReceivers.isEmpty()) {
client.appendEntry(request, handler);
@@ -1135,9 +1143,10 @@ public abstract class RaftMember {
try {
AppendLogResult appendLogResult =
waitAppendResult(
- sendLogRequest.getVoteCounter(),
+ sendLogRequest.getVotedNodeIds(),
sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm());
+ sendLogRequest.getNewLeaderTerm(),
+ sendLogRequest.getQuorumSize());
Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
sendLogRequest.getLog().getCreateTime());
switch (appendLogResult) {
@@ -1162,7 +1171,7 @@ public abstract class RaftMember {
}
public SendLogRequest buildSendLogRequest(Log log) {
- AtomicInteger voteCounter = new AtomicInteger(allNodes.size() / 2);
+ Set<Integer> votedNodeIds = new HashSet<>(allNodes.size());
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
@@ -1170,7 +1179,8 @@ public abstract class RaftMember {
AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
- return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
+ return new SendLogRequest(log, votedNodeIds, leaderShipStale, newLeaderTerm,
+ appendEntryRequest, allNodes.size() / 2);
}
/**
@@ -1526,17 +1536,18 @@ public abstract class RaftMember {
*/
@SuppressWarnings({"java:S2445"}) // safe synchronized
private AppendLogResult waitAppendResult(
- AtomicInteger voteCounter, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm) {
+ Set<Integer> votedNodeIds, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm,
+ int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
- synchronized (voteCounter) {
+ synchronized (votedNodeIds) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- while (voteCounter.get() > 0
+ while (votedNodeIds.size() >= quorumSize
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
- && voteCounter.get() != Integer.MAX_VALUE) {
+ && !votedNodeIds.contains(Integer.MAX_VALUE)) {
try {
- voteCounter.wait(RaftServer.getWriteOperationTimeoutMS());
+ votedNodeIds.wait(RaftServer.getWriteOperationTimeoutMS());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
@@ -1557,7 +1568,7 @@ public abstract class RaftMember {
}
// cannot get enough agreements within a certain amount of time
- if (voteCounter.get() > 0) {
+ if (votedNodeIds.size() < quorumSize) {
return AppendLogResult.TIME_OUT;
}
@@ -1775,11 +1786,11 @@ public abstract class RaftMember {
private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
if (requiredQuorum <= 0) {
// use half of the members' size as the quorum
- return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2));
+ return sendLogToFollowers(log, new HashSet<>(), allNodes.size() / 2);
} else {
// make sure quorum does not exceed the number of members - 1
return sendLogToFollowers(
- log, new AtomicInteger(Math.min(requiredQuorum, allNodes.size() - 1)));
+ log, new HashSet<>(), Math.min(requiredQuorum, allNodes.size() - 1));
}
}
@@ -1789,10 +1800,10 @@ public abstract class RaftMember {
* than the local term, retire from leader and return a LEADERSHIP_STALE. If "voteCounter" is
* still positive after a certain time, return TIME_OUT.
*
- * @param voteCounter a decreasing vote counter
+ * @param votedNodeIds a set of voted nodes' identifiers
* @return an AppendLogResult indicating a success or a failure and why
*/
- private AppendLogResult sendLogToFollowers(Log log, AtomicInteger voteCounter) {
+ private AppendLogResult sendLogToFollowers(Log log, Set<Integer> votedNodeIds, int quorumSize) {
if (allNodes.size() == 1) {
// single node group, does not need the agreement of others
return AppendLogResult.OK;
@@ -1814,7 +1825,7 @@ public abstract class RaftMember {
appendLogThreadPool.submit(
() ->
sendLogToFollower(
- log, voteCounter, node, leaderShipStale, newLeaderTerm, request));
+ log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, quorumSize));
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -1823,7 +1834,7 @@ public abstract class RaftMember {
// there is only one member, send to it within this thread to reduce thread switching
// overhead
for (Node node : allNodes) {
- sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request);
+ sendLogToFollower(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, quorumSize);
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -1835,18 +1846,19 @@ public abstract class RaftMember {
return AppendLogResult.TIME_OUT;
}
- return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
+ return waitAppendResult(votedNodeIds, leaderShipStale, newLeaderTerm, quorumSize);
}
public void sendLogToFollower(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
- AppendEntryRequest request) {
- sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request,
- Collections.emptyList());
+ AppendEntryRequest request,
+ int quorumSize) {
+ sendLogToFollower(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request,
+ quorumSize, Collections.emptyList());
}
/**
@@ -1854,11 +1866,12 @@ public abstract class RaftMember {
*/
public void sendLogToFollower(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
+ int quorumSize,
List<Node> indirectReceivers) {
if (node.equals(thisNode)) {
return;
@@ -1880,9 +1893,11 @@ public abstract class RaftMember {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
+ sendLogAsync(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, peer,
+ quorumSize, indirectReceivers);
} else {
- sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
+ sendLogSync(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, request, peer,
+ quorumSize, indirectReceivers);
}
}
@@ -1916,19 +1931,20 @@ public abstract class RaftMember {
private void sendLogSync(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer, List<Node> indirectReceivers) {
+ Peer peer, int quorumSize, List<Node> indirectReceivers) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
+ getAppendNodeEntryHandler(log, votedNodeIds, node, leaderShipStale, newLeaderTerm, peer
+ , quorumSize);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
- long result;
+ AppendEntryResult result;
if (indirectReceivers == null || indirectReceivers.isEmpty()) {
result = client.appendEntry(request);
} else {
@@ -1949,19 +1965,21 @@ public abstract class RaftMember {
public AppendNodeEntryHandler getAppendNodeEntryHandler(
Log log,
- AtomicInteger voteCounter,
+ Set<Integer> votedNodeIds,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
- Peer peer) {
+ Peer peer,
+ int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setReceiver(node);
- handler.setVoteCounter(voteCounter);
+ handler.setVotedNodeIds(votedNodeIds);
handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
handler.setMember(this);
handler.setPeer(peer);
handler.setReceiverTerm(newLeaderTerm);
+ handler.setQuorumSize(quorumSize);
return handler;
}
@@ -1981,26 +1999,32 @@ public abstract class RaftMember {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ protected AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit,
+ Log log) {
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
- return resp;
+ return new AppendEntryResult(resp);
}
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long success;
+ AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ if (success != -1) {
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ }
}
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
if (success != -1) {
logger.debug("{} append a new log {}", name, log);
- resp = Response.RESPONSE_AGREE;
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
} else {
// the incoming log points to an illegal position, reject it
- resp = Response.RESPONSE_LOG_MISMATCH;
+ result.status = Response.RESPONSE_LOG_MISMATCH;
}
- return resp;
+ return result;
}
/**
@@ -2053,7 +2077,7 @@ public abstract class RaftMember {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- private long appendEntries(
+ private AppendEntryResult appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
logger.debug(
"{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
@@ -2062,14 +2086,15 @@ public abstract class RaftMember {
prevLogTerm,
leaderCommit);
if (logs.isEmpty()) {
- return Response.RESPONSE_AGREE;
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
- return resp;
+ return new AppendEntryResult(resp);
}
+ AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
@@ -2078,13 +2103,15 @@ public abstract class RaftMember {
if (logger.isDebugEnabled()) {
logger.debug("{} append a new log list {}, commit to {}", name, logs, leaderCommit);
}
- resp = Response.RESPONSE_AGREE;
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
} else {
// the incoming log points to an illegal position, reject it
- resp = Response.RESPONSE_LOG_MISMATCH;
+ result.status = Response.RESPONSE_LOG_MISMATCH;
}
}
- return resp;
+ return result;
}
/**
@@ -2128,13 +2155,14 @@ public abstract class RaftMember {
/**
* Process the result from an indirect receiver of an entry.
+ *
* @param ack acknowledgement from an indirect receiver.
*/
- public void acknowledgeAppendLog(AppendEntryAcknowledgement ack) {
+ public void acknowledgeAppendLog(AppendEntryResult ack) {
AppendNodeEntryHandler appendNodeEntryHandler = sentLogHandlers
- .get(new Pair<>(ack.index, ack.term));
+ .get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
if (appendNodeEntryHandler != null) {
- appendNodeEntryHandler.onComplete(ack.response);
+ appendNodeEntryHandler.onComplete(ack);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index eecf26e..5fadd69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -19,12 +19,16 @@
package org.apache.iotdb.cluster.server.service;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.util.List;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
@@ -39,15 +43,9 @@ import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-
public abstract class BaseAsyncService implements RaftService.AsyncIface {
RaftMember member;
@@ -70,7 +68,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
}
@Override
- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
try {
resultHandler.onComplete(member.appendEntry(request));
} catch (UnknownLogTypeException e) {
@@ -79,7 +77,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
}
@Override
- public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
try {
resultHandler.onComplete(member.appendEntries(request));
} catch (Exception e) {
@@ -177,7 +175,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+ public void acknowledgeAppendEntry(AppendEntryResult ack,
AsyncMethodCallback<Void> resultHandler) {
member.acknowledgeAppendLog(ack);
resultHandler.onComplete(null);
@@ -185,7 +183,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
@Override
public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
- AsyncMethodCallback<Long> resultHandler) {
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
try {
resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
} catch (UnknownLogTypeException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 181ad8a..6c795d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -28,8 +28,8 @@ import java.util.List;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
@@ -68,7 +68,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
try {
return member.appendEntry(request);
} catch (UnknownLogTypeException e) {
@@ -77,7 +77,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public long appendEntries(AppendEntriesRequest request) throws TException {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) throws TException {
try {
return member.appendEntries(request);
} catch (BufferUnderflowException e) {
@@ -159,12 +159,12 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+ public void acknowledgeAppendEntry(AppendEntryResult ack) {
member.acknowledgeAppendLog(ack);
}
@Override
- public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+ public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
throws TException {
try {
return member.appendEntryIndirect(request, subReceivers);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 88626ba..86b8f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
@@ -54,11 +55,11 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
}
@Override
- public long appendEntry(AppendEntryRequest request) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request) throws TException {
if (metaGroupMember.getPartitionTable() == null) {
// this node lacks information of the cluster and refuse to work
logger.debug("This node is blind to the cluster and cannot accept logs");
- return Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE;
+ return new AppendEntryResult(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
}
return super.appendEntry(request);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index dab8844..39a0457 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -74,7 +74,7 @@ public class AppendNodeEntryHandlerTest {
for (int i = 0; i < 10; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVoteCounter(quorum);
+ handler.setVotedNodeIds(quorum);
handler.setLog(log);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
@@ -104,7 +104,7 @@ public class AppendNodeEntryHandlerTest {
for (int i = 0; i < 3; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVoteCounter(quorum);
+ handler.setVotedNodeIds(quorum);
handler.setLog(log);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
@@ -129,7 +129,7 @@ public class AppendNodeEntryHandlerTest {
synchronized (quorum) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVoteCounter(quorum);
+ handler.setVotedNodeIds(quorum);
handler.setLog(log);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
@@ -156,7 +156,7 @@ public class AppendNodeEntryHandlerTest {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
- handler.setVoteCounter(quorum);
+ handler.setVotedNodeIds(quorum);
handler.setLog(log);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index d2eeca4..ddc6eb4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
@@ -208,8 +209,8 @@ public class BaseMember {
}
@Override
- public long appendEntry(AppendEntryRequest request) {
- return Response.RESPONSE_AGREE;
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java
index 694c4aa..dcf36f7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.config.ConsistencyLevel;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
@@ -184,8 +185,8 @@ public class RaftMemberTest extends BaseMember {
}
@Override
- public long appendEntry(AppendEntryRequest request) {
- return Response.RESPONSE_AGREE;
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
@Override
@@ -221,8 +222,8 @@ public class RaftMemberTest extends BaseMember {
}
@Override
- public long appendEntry(AppendEntryRequest request) {
- return Response.RESPONSE_AGREE;
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
+ return new AppendEntryResult(Response.RESPONSE_AGREE);
}
@Override
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 449e843..dc40b3f 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -271,6 +271,12 @@ struct GetAllPathsResult {
2: optional list<string> aliasList
}
+struct AppendEntryResult {
+ 1: required i64 status;
+ 2: optional i64 lastLogTerm;
+ 3: optional i64 lastLogIndex;
+}
+
service RaftService {
/**
@@ -303,7 +309,7 @@ service RaftService {
* @param request entries that need to be appended and the information of the leader.
* @return -1: agree, -2: log index mismatch , otherwise return the follower's term
**/
- long appendEntries(1:AppendEntriesRequest request)
+ AppendEntryResult appendEntries(1:AppendEntriesRequest request)
/**
* Leader will call this method to send a entry to all followers.
@@ -314,12 +320,12 @@ service RaftService {
* @param request entry that needs to be appended and the information of the leader.
* @return -1: agree, -2: log index mismatch , otherwise return the follower's term
**/
- long appendEntry(1:AppendEntryRequest request)
+ AppendEntryResult appendEntry(1:AppendEntryRequest request)
/**
* Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
**/
- long appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
+ AppendEntryResult appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
void sendSnapshot(1:SendSnapshotRequest request)
@@ -357,7 +363,7 @@ service RaftService {
* when a follower reiceives an AppendEntryRequest from a non-leader node, it sends this ack to
* the leader so the leader can know whether it has successfully received the entry
**/
- void acknowledgeAppendEntry(1: AppendEntryAcknowledgement ack)
+ void acknowledgeAppendEntry(1: AppendEntryResult ack)
}