You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/27 01:04:19 UTC
[iotdb] branch native_raft updated: fix snapshot catch-up
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 522244b985 fix snapshot catch-up
522244b985 is described below
commit 522244b985443d5f276184ffa6307cfff98bd835
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Feb 27 09:05:42 2023 +0800
fix snapshot catch-up
---
.gitignore | 2 +
consensus/pom.xml | 2 +-
.../org/apache/iotdb/consensus/common/Utils.java | 6 +
.../iotdb/consensus/natraft/RaftConsensus.java | 4 +-
.../natraft/client/SyncClientAdaptor.java | 10 ++
.../consensus/natraft/protocol/RaftConfig.java | 2 +-
.../consensus/natraft/protocol/RaftMember.java | 35 +++-
.../protocol/heartbeat/ElectionReqHandler.java | 41 +++--
.../protocol/heartbeat/HeartbeatReqHandler.java | 19 ++
.../protocol/heartbeat/HeartbeatRespHandler.java | 2 +-
.../natraft/protocol/log/catchup/CatchUpTask.java | 12 +-
.../log/catchup/SnapshotCatchUpHandler.java | 14 +-
.../protocol/log/catchup/SnapshotCatchUpTask.java | 30 ++--
.../manager/DirectorySnapshotRaftLogManager.java | 20 ++-
.../protocol/log/manager/RaftLogManager.java | 16 +-
.../protocol/log/snapshot/DirectorySnapshot.java | 191 ++++++++++++++++++++-
.../natraft/protocol/log/snapshot/Snapshot.java | 3 +-
.../natraft/service/RaftRPCServiceProcessor.java | 23 ++-
.../iotdb/consensus/natraft/utils/IOUtils.java | 95 ++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
thrift-raft/pom.xml | 2 +-
thrift-raft/src/main/thrift/raft.thrift | 9 +-
22 files changed, 471 insertions(+), 68 deletions(-)
diff --git a/.gitignore b/.gitignore
index ee4b9d2f73..afcbcf77ef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,5 @@ node3/
antlr/gen/
antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/gen/
antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.tokens
+distribution/test/
+distribution/distribute-fit.sh
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 89975e7bba..fb72d61faf 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -67,7 +67,7 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>thrift-raft-consensus</artifactId>
+ <artifactId>iotdb-thrift-raft-consensus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index 5803157ea5..d2dbd5c5dc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.consensus.common;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,10 @@ import java.util.List;
public class Utils {
private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+ public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) {
+ return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
+ }
+
public static List<Path> listAllRegularFilesRecursively(File rootDir) {
List<Path> allFiles = new ArrayList<>();
try {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 81fe9ebeb0..03ff338c79 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -111,6 +111,7 @@ public class RaftConsensus implements IConsensus {
} else {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
+ logger.info("Recovering a RaftMember from {}", path);
Path fileName = path.getFileName();
String[] items = fileName.toString().split("_");
if (items.length != 2) {
@@ -121,6 +122,7 @@ public class RaftConsensus implements IConsensus {
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
RaftMember raftMember =
new RaftMember(
+ path.toString(),
config,
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
@@ -202,7 +204,7 @@ public class RaftConsensus implements IConsensus {
}
RaftMember impl =
new RaftMember(
- config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
+ path, config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
impl.start();
return impl;
});
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index 15ffa4aa67..fef5e19c26 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -18,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -82,6 +83,15 @@ public class SyncClientAdaptor {
return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
}
+ public static ByteBuffer readFile(
+ AsyncRaftServiceClient client, String remotePath, long offset, int fetchSize)
+ throws InterruptedException, TException {
+ GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getEndpoint());
+
+ client.readFile(remotePath, offset, fetchSize, handler);
+ return handler.getResult(config.getConnectionTimeoutInMS());
+ }
+
public static void setConfig(RaftConfig config) {
SyncClientAdaptor.config = config;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 7bc82ed8f9..b29d51c951 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -93,7 +93,7 @@ public class RaftConfig {
private RPCConfig rpcConfig;
public RaftConfig(ConsensusConfig config) {
- this.storageDir = config.getStorageDir() + File.separator + "system";
+ this.storageDir = config.getStorageDir();
new File(this.storageDir).mkdirs();
this.rpcConfig = config.getRPCConfig();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 97ad720f88..e0019b89c5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -187,6 +187,7 @@ public class RaftMember {
private FlowBalancer flowBalancer;
public RaftMember(
+ String storageDir,
RaftConfig config,
Peer thisNode,
List<Peer> allNodes,
@@ -194,6 +195,7 @@ public class RaftMember {
IStateMachine stateMachine,
IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
this.config = config;
+ this.storageDir = storageDir;
initConfig();
this.thisNode = thisNode;
@@ -252,7 +254,7 @@ public class RaftMember {
for (int i = 0; i < size; i++) {
allNodes.add(Peer.deserialize(buffer));
}
- logger.info("Recover IoTConsensus server Impl, configuration: {}", allNodes);
+ logger.info("{}: Recover IoTConsensus server Impl, configuration: {}", name, allNodes);
} catch (IOException e) {
logger.error("Unexpected error occurs when recovering configuration", e);
}
@@ -304,7 +306,6 @@ public class RaftMember {
private void initConfig() {
this.enableWeakAcceptance = config.isEnableWeakAcceptance();
- this.storageDir = config.getStorageDir();
}
public void initPeerMap() {
@@ -1006,10 +1007,32 @@ public class RaftMember {
}
}
- public void installSnapshot(ByteBuffer snapshotBytes) {
- DirectorySnapshot directorySnapshot = new DirectorySnapshot(null);
- directorySnapshot.deserialize(snapshotBytes);
- directorySnapshot.install(this);
+ public String getLocalSnapshotTmpDir(String remoteSnapshotDirName) {
+ return config.getStorageDir()
+ + File.separator
+ + groupId
+ + File.separator
+ + "remote_snapshot"
+ + File.separator
+ + remoteSnapshotDirName;
+ }
+
+ public TSStatus installSnapshot(ByteBuffer snapshotBytes, TEndPoint source) {
+ if (!snapshotApplyLock.tryLock()) {
+ return new TSStatus(TSStatusCode.SNAPSHOT_INSTALLING.getStatusCode());
+ }
+
+ DirectorySnapshot directorySnapshot;
+ try {
+ directorySnapshot = new DirectorySnapshot(null, null);
+ directorySnapshot.deserialize(snapshotBytes);
+ directorySnapshot.setSource(source);
+ directorySnapshot.setMemberName(name);
+ directorySnapshot.install(this);
+ } finally {
+ snapshotApplyLock.unlock();
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public boolean containsNode(Peer node) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
index a39a9524b3..cb1e74172b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
@@ -42,12 +42,26 @@ public class ElectionReqHandler {
}
public long processElectionRequest(ElectionRequest electionRequest) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: start to handle request from elector {}",
+ TEndPoint candidate = electionRequest.getElector();
+ Peer peer =
+ new Peer(
+ Factory.createFromTConsensusGroupId(electionRequest.groupId),
+ electionRequest.electorId,
+ candidate);
+ // check if the node is in the group
+ if (!member.containsNode(peer)) {
+ logger.info(
+ "{}: the elector {} is not in the data group {}, so reject this election.",
member.getName(),
- electionRequest.getElector());
+ peer,
+ member.getAllNodes());
+ return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: start to handle request from elector {}", member.getName(), peer);
}
+
long currentTerm = member.getStatus().getTerm().get();
long response =
checkElectorTerm(currentTerm, electionRequest.getTerm(), electionRequest.getElector());
@@ -56,7 +70,7 @@ public class ElectionReqHandler {
}
// compare the log progress of the elector with this node
- response = checkElectorLogProgress(electionRequest);
+ response = checkElectorLogProgress(electionRequest, peer);
logger.info(
"{} sending response {} to the elector {}",
member.getName(),
@@ -109,22 +123,7 @@ public class ElectionReqHandler {
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
* smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
- long checkElectorLogProgress(ElectionRequest electionRequest) {
- TEndPoint candidate = electionRequest.getElector();
- Peer peer =
- new Peer(
- Factory.createFromTConsensusGroupId(electionRequest.groupId),
- electionRequest.electorId,
- candidate);
- // check if the node is in the group
- if (!member.containsNode(peer)) {
- logger.info(
- "{}: the elector {} is not in the data group {}, so reject this election.",
- member.getName(),
- member.getAllNodes(),
- candidate);
- return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
- }
+ long checkElectorLogProgress(ElectionRequest electionRequest, Peer peer) {
long thatTerm = electionRequest.getTerm();
long thatLastLogIndex = electionRequest.getLastLogIndex();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
index 55d63af7f2..615a34224b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
index ae13851b02..326c5367f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
@@ -116,7 +116,7 @@ public class HeartbeatRespHandler implements AsyncMethodCallback<HeartBeatRespon
if (lastLogIdx == peerInfo.getLastHeartBeatIndex() && !resp.isInstallingSnapshot()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peerInfo.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 5000) {
+ if (inconsistentNum >= 5) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
memberName,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index 11292d0b26..e7ae9e2dca 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -302,7 +302,12 @@ public class CatchUpTask implements Runnable {
raftMember.getLogManager().takeSnapshot();
snapshot = raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
if (logger.isInfoEnabled()) {
- logger.info("{}: Logs in {} are too old, catch up with snapshot", raftMember.getName(), node);
+ logger.info(
+ "{}: Logs in {} are too old, catch up with snapshot {}-{}",
+ raftMember.getName(),
+ node,
+ snapshot.getLastLogIndex(),
+ snapshot.getLastLogTerm());
}
}
@@ -349,7 +354,7 @@ public class CatchUpTask implements Runnable {
catchUpSucceeded = task.call();
}
if (catchUpSucceeded) {
- // the catch up may be triggered by an old heartbeat, and the node may have already
+ // the catch-up may be triggered by an old heartbeat, and the node may have already
// caught up, so logs can be empty
if (!logs.isEmpty() || snapshot != null) {
long lastIndex =
@@ -367,6 +372,9 @@ public class CatchUpTask implements Runnable {
System.currentTimeMillis() - startTime);
}
peerInfo.resetInconsistentHeartbeatNum();
+ } else {
+ // wait for a while before the next catch-up so that the status of the follower may update
+ Thread.sleep(5000);
}
} catch (LeaderUnknownException e) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
index 56af324502..05bd80e680 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
@@ -26,27 +27,28 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/** SnapshotCatchUpHandler receives the result of sending a snapshot to a stale node. */
-public class SnapshotCatchUpHandler implements AsyncMethodCallback<Void> {
+public class SnapshotCatchUpHandler implements AsyncMethodCallback<TSStatus> {
private static final Logger logger = LoggerFactory.getLogger(SnapshotCatchUpHandler.class);
- private AtomicBoolean succeed;
+ private AtomicReference<TSStatus> succeed;
private Peer receiver;
private Snapshot snapshot;
- public SnapshotCatchUpHandler(AtomicBoolean succeed, Peer receiver, Snapshot snapshot) {
+ public SnapshotCatchUpHandler(
+ AtomicReference<TSStatus> succeed, Peer receiver, Snapshot snapshot) {
this.succeed = succeed;
this.receiver = receiver;
this.snapshot = snapshot;
}
@Override
- public void onComplete(Void resp) {
+ public void onComplete(TSStatus resp) {
synchronized (succeed) {
- succeed.set(true);
+ succeed.set(resp);
succeed.notifyAll();
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
index 437c25d645..f8161e4db8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.exception.LeaderUnknownException;
@@ -27,6 +28,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
import org.apache.iotdb.consensus.raft.thrift.SendSnapshotRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -35,7 +37,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* SnapshotCatchUpTask first sends the snapshot to the stale node then sends the logs to the node.
@@ -62,6 +64,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
private void doSnapshotCatchUp() throws TException, InterruptedException, LeaderUnknownException {
SendSnapshotRequest request = new SendSnapshotRequest();
request.setGroupId(raftMember.getRaftGroupId().convertToTConsensusGroupId());
+ request.setSource(raftMember.getThisNode().getEndpoint());
logger.info("Start to send snapshot to {}", node);
ByteBuffer data = snapshot.serialize();
if (logger.isInfoEnabled()) {
@@ -76,34 +79,39 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
}
}
- abort = !sendSnapshotAsync(request);
+ TSStatus tsStatus = sendSnapshotAsync(request);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ abort = true;
+ logger.warn("Failed to send snapshot to {}: {}", node, tsStatus);
+ }
}
@SuppressWarnings("java:S2274") // enable timeout
- private boolean sendSnapshotAsync(SendSnapshotRequest request)
+ private TSStatus sendSnapshotAsync(SendSnapshotRequest request)
throws TException, InterruptedException {
- AtomicBoolean succeed = new AtomicBoolean(false);
- SnapshotCatchUpHandler handler = new SnapshotCatchUpHandler(succeed, node, snapshot);
+ AtomicReference<TSStatus> result =
+ new AtomicReference<>(new TSStatus(TSStatusCode.TIME_OUT.getStatusCode()));
+ SnapshotCatchUpHandler handler = new SnapshotCatchUpHandler(result, node, snapshot);
AsyncRaftServiceClient client = raftMember.getClient(node.getEndpoint());
if (client == null) {
logger.info("{}: client null for node {}", raftMember.getThisNode(), node);
abort = true;
- return false;
+ return result.get();
}
logger.info(
"{}: the snapshot request size={}",
raftMember.getName(),
request.getSnapshotBytes().length);
- synchronized (succeed) {
+ synchronized (result) {
client.sendSnapshot(request, handler);
catchUpManager.registerTask(node);
- succeed.wait(sendSnapshotWaitMs);
+ result.wait(sendSnapshotWaitMs);
}
if (logger.isInfoEnabled()) {
- logger.info("send snapshot to node {} success {}", raftMember.getThisNode(), succeed.get());
+ logger.info("send snapshot to node {} success {}", raftMember.getThisNode(), result.get());
}
- return succeed.get();
+ return result.get();
}
@Override
@@ -122,7 +130,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
} else {
logger.warn("{}: Log catch up {} failed", raftMember.getName(), node);
}
- // the next catch up is enabled
+ // the next catch-up is enabled
catchUpManager.unregisterTask(node);
return !abort;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index c353d1d8cc..d6a032d32a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -25,12 +25,18 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
+import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
public class DirectorySnapshotRaftLogManager extends RaftLogManager {
private File latestSnapshotDir;
+ private long snapshotIndex;
+ private long snapshotTerm;
+ DirectorySnapshot directorySnapshot;
public DirectorySnapshotRaftLogManager(
StableEntryManager stableEntryManager,
@@ -43,7 +49,7 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
@Override
public Snapshot getSnapshot(long minLogIndex) {
- return new DirectorySnapshot(latestSnapshotDir);
+ return directorySnapshot;
}
@Override
@@ -55,6 +61,18 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
+ getName()
+ "-snapshot-"
+ System.currentTimeMillis());
+ try {
+ lock.readLock().lock();
+ snapshotIndex = getAppliedIndex();
+ snapshotTerm = getAppliedTerm();
+ } finally {
+ lock.readLock().unlock();
+ }
stateMachine.takeSnapshot(latestSnapshotDir);
+ List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
+ snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
+ directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles);
+ directorySnapshot.setLastLogIndex(snapshotIndex);
+ directorySnapshot.setLastLogTerm(snapshotTerm);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 342c63e9fe..682bec242a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -752,6 +752,10 @@ public abstract class RaftLogManager {
return appliedIndex;
}
+ public long getAppliedTerm() {
+ return appliedTerm;
+ }
+
/** check whether delete the committed log */
void checkDeleteLog() {
try {
@@ -881,10 +885,16 @@ public abstract class RaftLogManager {
}
}
}
- synchronized (changeApplyCommitIndexCond) {
- // maxHaveAppliedCommitIndex may change if a snapshot is applied concurrently
- appliedIndex = Math.max(appliedIndex, nextToCheckIndex);
+ if (nextToCheckIndex > appliedIndex) {
+ synchronized (changeApplyCommitIndexCond) {
+ // maxHaveAppliedCommitIndex may change if a snapshot is applied concurrently
+ if (nextToCheckIndex > appliedIndex) {
+ appliedTerm = log.getCurrLogTerm();
+ appliedIndex = nextToCheckIndex;
+ }
+ }
}
+
logger.debug(
"{}: log={} is applied, nextToCheckIndex={}, commitIndex={}, maxHaveAppliedCommitIndex={}",
name,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index c218104c88..d1fd917684 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,26 +4,65 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
+import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
public class DirectorySnapshot extends Snapshot {
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectorySnapshot.class);
private File directory;
+ private List<Path> filePaths;
+ private TEndPoint source;
+ private String memberName;
- public DirectorySnapshot(File directory) {
+ public DirectorySnapshot(File directory, List<Path> filePaths) {
this.directory = directory;
+ this.filePaths = filePaths;
}
@Override
public ByteBuffer serialize() {
- byte[] bytes = directory.getAbsolutePath().getBytes();
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2 + bytes.length + Integer.BYTES);
+
+ byte[] rootBytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
+ int bufferSize = Long.BYTES * 2 + rootBytes.length + Integer.BYTES * 2;
+ byte[][] filePathBytes = new byte[filePaths.size()][];
+ for (int i = 0; i < filePaths.size(); i++) {
+ Path path = filePaths.get(i);
+ byte[] bytes = path.toString().getBytes(StandardCharsets.UTF_8);
+ filePathBytes[i] = bytes;
+ bufferSize += Integer.BYTES;
+ bufferSize += bytes.length;
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
buffer.putLong(lastLogIndex);
buffer.putLong(lastLogTerm);
- buffer.putInt(bytes.length);
- buffer.put(bytes);
+ buffer.putInt(rootBytes.length);
+ buffer.put(rootBytes);
+ buffer.putInt(filePaths.size());
+ for (byte[] relativeFileByte : filePathBytes) {
+ buffer.putInt(relativeFileByte.length);
+ buffer.put(relativeFileByte);
+ }
buffer.flip();
return buffer;
}
@@ -35,12 +74,148 @@ public class DirectorySnapshot extends Snapshot {
int size = buffer.getInt();
byte[] bytes = new byte[size];
buffer.get(bytes);
- directory = new File(new String(bytes));
+ directory = new File(new String(bytes, StandardCharsets.UTF_8));
+
+ int fileNum = buffer.getInt();
+ filePaths = new ArrayList<>(fileNum);
+ for (int i = 0; i < fileNum; i++) {
+ int pathSize = buffer.getInt();
+ bytes = new byte[pathSize];
+ buffer.get(bytes);
+ filePaths.add(new File(new String(bytes, StandardCharsets.UTF_8)).toPath());
+ }
}
@Override
- public void install(RaftMember member) {
- member.getStateMachine().loadSnapshot(directory);
+ public TSStatus install(RaftMember member) {
+ String localSnapshotTmpDirPath = member.getLocalSnapshotTmpDir(directory.getName());
+ TSStatus tsStatus = downloadSnapshot(localSnapshotTmpDirPath, member);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return tsStatus;
+ }
+ member.getStateMachine().loadSnapshot(new File(localSnapshotTmpDirPath));
member.getLogManager().applySnapshot(this);
+ return tsStatus;
+ }
+
+ public void setSource(TEndPoint source) {
+ this.source = source;
+ }
+
+ public void setMemberName(String memberName) {
+ this.memberName = memberName;
+ }
+
+ private TSStatus downloadSnapshot(String localSnapshotTmpDirPath, RaftMember member) {
+ File localSnapshotTmpDir = new File(localSnapshotTmpDirPath);
+ localSnapshotTmpDir.mkdirs();
+ logger.info(
+ "Downloading {} files from {} to {}", filePaths.size(), source, localSnapshotTmpDir);
+ String snapshotName = directory.getName();
+ for (Path path : filePaths) {
+ logger.info("Downloading {} of {}", path, snapshotName);
+ String pathStr = path.toString();
+ String relativePath =
+ pathStr.substring(pathStr.indexOf(snapshotName) + snapshotName.length());
+ String targetFilePath = localSnapshotTmpDirPath + File.separator + relativePath;
+ TSStatus tsStatus = downloadFile(pathStr, targetFilePath, member);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.warn("Downloading failed: {}", tsStatus);
+ return tsStatus;
+ }
+ logger.info("File downloaded");
+ }
+ logger.info("Downloaded {} files from {}", filePaths.size(), source);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus downloadFile(String sourceFilePath, String targetFilePath, RaftMember member) {
+ int pullFileRetry = 5;
+ File targetFile = new File(targetFilePath);
+ targetFile.getParentFile().mkdirs();
+ for (int i = 0; i < pullFileRetry; i++) {
+ try (BufferedOutputStream bufferedOutputStream =
+ new BufferedOutputStream(Files.newOutputStream(targetFile.toPath()))) {
+ downloadFileAsync(source, sourceFilePath, bufferedOutputStream, member);
+
+ if (logger.isInfoEnabled()) {
+ logger.info(
+ "{}: remote file {} is pulled at {}, length: {}",
+ memberName,
+ sourceFilePath,
+ targetFilePath,
+ new File(targetFilePath).length());
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (TException e) {
+ logger.warn(
+ "{}: Cannot pull file {} from {}, wait 5s to retry",
+ memberName,
+ sourceFilePath,
+ source,
+ e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn(
+ "{}: Pulling file {} from {} interrupted", memberName, sourceFilePath, source, e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage("Interrupted");
+ } catch (IOException e) {
+ logger.warn("{}: Pulling file {} from {} failed", memberName, sourceFilePath, source, e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+
+ try {
+ Files.delete(new File(targetFilePath).toPath());
+ Thread.sleep(5000);
+ } catch (IOException e) {
+ logger.warn("Cannot delete file when pulling {} from {} failed", sourceFilePath, source);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.warn(
+ "{}: Pulling file {} from {} interrupted", memberName, sourceFilePath, source, ex);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage("Interrupted");
+ }
+ // next try
+ }
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage(String.format("Cannot pull file %s from %s", sourceFilePath, source));
+ }
+
+ private void downloadFileAsync(
+ TEndPoint node, String remotePath, OutputStream dest, RaftMember member)
+ throws IOException, TException, InterruptedException {
+ long offset = 0;
+ // TODO-Cluster: use elaborate downloading techniques
+ int fetchSize = 64 * 1024;
+
+ while (true) {
+ AsyncRaftServiceClient client = member.getClient(node);
+ if (client == null) {
+ throw new IOException("No available client for " + node.toString());
+ }
+ ByteBuffer buffer;
+ buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+ int len = writeBuffer(buffer, dest);
+ if (len == 0) {
+ break;
+ }
+ offset += len;
+ }
+ dest.flush();
+ }
+
+ private int writeBuffer(ByteBuffer buffer, OutputStream dest) throws IOException {
+ if (buffer == null || buffer.limit() - buffer.position() == 0) {
+ return 0;
+ }
+
+ // notice: the buffer returned by thrift is a slice of a larger buffer which contains
+ // the whole response, so buffer.position() is not 0 initially and buffer.limit() is
+ // not the size of the downloaded chunk
+ dest.write(buffer.array(), buffer.position() + buffer.arrayOffset(), buffer.remaining());
+ return buffer.remaining();
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index c2260ce97f..8414ad5483 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import java.nio.ByteBuffer;
@@ -55,7 +56,7 @@ public abstract class Snapshot {
return lastLogTerm;
}
- public abstract void install(RaftMember member);
+ public abstract TSStatus install(RaftMember member);
/**
* Discard contents which is generated by logs whose index <= 'minIndex' if possible. This method
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 41267531e0..cefecbf9a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.natraft.RaftConsensus;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -42,6 +43,9 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
private final Logger logger = LoggerFactory.getLogger(RaftRPCServiceProcessor.class);
@@ -74,6 +78,10 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler)
throws TException {
RaftMember member = getMember(request.groupId);
+ logger.info(
+ "Member for election request: {}, request groupId {}",
+ member.getRaftGroupId(),
+ request.getGroupId());
resultHandler.onComplete(member.processElectionRequest(request));
}
@@ -90,11 +98,10 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
}
@Override
- public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler)
+ public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<TSStatus> resultHandler)
throws TException {
RaftMember member = getMember(request.groupId);
- member.installSnapshot(request.snapshotBytes);
- resultHandler.onComplete(null);
+ resultHandler.onComplete(member.installSnapshot(request.snapshotBytes, request.source));
}
@Override
@@ -127,4 +134,14 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
RaftMember member = getMember(groupId);
resultHandler.onComplete(member.requestCommitIndex());
}
+
+ @Override
+ public void readFile(
+ String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ try {
+ resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
+ } catch (IOException e) {
+ resultHandler.onError(e);
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
index fa232fcbbd..cb1271e6cb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
@@ -22,6 +22,17 @@ package org.apache.iotdb.consensus.natraft.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
@SuppressWarnings("java:S1135")
public class IOUtils {
@@ -38,4 +49,88 @@ public class IOUtils {
}
return curr;
}
+
+ public static List<String> collectRelativePaths(File directory) {
+ if (!directory.exists() || !directory.isDirectory()) {
+ logger.warn("{} is not a directory", directory);
+ return Collections.emptyList();
+ }
+
+ String currentRelativePath = "";
+ List<String> result = new ArrayList<>();
+ collectRelativePaths(directory, currentRelativePath, result);
+ return result;
+ }
+
+ public static void collectRelativePaths(
+ File directory, String currentRelativePath, List<String> result) {
+ File[] files = directory.listFiles();
+ if (files == null) {
+ logger.warn("Cannot list files under {}", directory);
+ return;
+ }
+ for (File file : files) {
+ if (file.isDirectory()) {
+ collectRelativePaths(file, currentRelativePath + File.separator + file.getName(), result);
+ } else {
+ result.add(currentRelativePath + File.separator + file.getName());
+ }
+ }
+ }
+
+ public static List<Path> collectPaths(File directory) {
+ if (!directory.exists() || !directory.isDirectory()) {
+ logger.warn("{} is not a directory", directory);
+ return Collections.emptyList();
+ }
+
+ List<Path> result = new ArrayList<>();
+ collectPaths(directory, result);
+ return result;
+ }
+
+ public static void collectPaths(File directory, List<Path> result) {
+ File[] files = directory.listFiles();
+ if (files == null) {
+ logger.warn("Cannot list files under {}", directory);
+ return;
+ }
+ for (File file : files) {
+ if (file.isDirectory()) {
+ collectPaths(file, result);
+ } else {
+ result.add(file.toPath());
+ }
+ }
+ }
+
+ /**
+ * An interface that is used for a node to pull chunks of files like TsFiles. The file should be a
+ * temporary hard link, and once the file is totally read, it will be removed.
+ */
+ public static ByteBuffer readFile(String filePath, long offset, int length) throws IOException {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ logger.warn("Reading a non-existing snapshot file {}", filePath);
+ return ByteBuffer.allocate(0);
+ }
+
+ ByteBuffer result;
+ int len;
+ try (BufferedInputStream bufferedInputStream =
+ new BufferedInputStream(Files.newInputStream(file.toPath()))) {
+ skipExactly(bufferedInputStream, offset);
+ byte[] bytes = new byte[length];
+ result = ByteBuffer.wrap(bytes);
+ len = bufferedInputStream.read(bytes);
+ result.limit(Math.max(len, 0));
+ }
+ return result;
+ }
+
+ private static void skipExactly(InputStream stream, long byteToSkip) throws IOException {
+ while (byteToSkip > 0) {
+ byteToSkip -= stream.skip(byteToSkip);
+ }
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 175d27802d..89252624b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -50,6 +50,7 @@ public enum TSStatusCode {
TIME_OUT(307),
NO_LEADER(308),
READ_ONLY(309),
+ SNAPSHOT_INSTALLING(310),
// Client,
REDIRECTION_RECOMMEND(400),
diff --git a/thrift-raft/pom.xml b/thrift-raft/pom.xml
index 8a056dc73a..45994aeaa9 100644
--- a/thrift-raft/pom.xml
+++ b/thrift-raft/pom.xml
@@ -27,7 +27,7 @@
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>thrift-raft-consensus</artifactId>
+ <artifactId>iotdb-thrift-raft-consensus</artifactId>
<name>rpc-thrift-raft-consensus</name>
<description>Rpc modules for raft consensus algorithm</description>
<dependencies>
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index 419bf2f68e..a518a239e0 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -100,6 +100,7 @@ struct SendSnapshotRequest {
1: required binary snapshotBytes
// for data group
2: required common.TConsensusGroupId groupId
+ 3: required common.TEndPoint source
}
struct RequestCommitIndexResponse {
@@ -150,7 +151,7 @@ service RaftService {
**/
AppendEntryResult appendEntries(1:AppendEntriesRequest request)
- void sendSnapshot(1:SendSnapshotRequest request)
+ common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
/**
* Test if a log of "index" and "term" exists.
@@ -169,4 +170,10 @@ service RaftService {
* leader.
**/
RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
+
+ /**
+ * Read a chunk of a file from the client. If the remaining of the file does not have enough
+ * bytes, only the remaining will be returned.
+ **/
+ binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
}
\ No newline at end of file