You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/27 01:04:19 UTC

[iotdb] branch native_raft updated: fix snapshot catch-up

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 522244b985 fix snapshot catch-up
522244b985 is described below

commit 522244b985443d5f276184ffa6307cfff98bd835
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Feb 27 09:05:42 2023 +0800

    fix snapshot catch-up
---
 .gitignore                                         |   2 +
 consensus/pom.xml                                  |   2 +-
 .../org/apache/iotdb/consensus/common/Utils.java   |   6 +
 .../iotdb/consensus/natraft/RaftConsensus.java     |   4 +-
 .../natraft/client/SyncClientAdaptor.java          |  10 ++
 .../consensus/natraft/protocol/RaftConfig.java     |   2 +-
 .../consensus/natraft/protocol/RaftMember.java     |  35 +++-
 .../protocol/heartbeat/ElectionReqHandler.java     |  41 +++--
 .../protocol/heartbeat/HeartbeatReqHandler.java    |  19 ++
 .../protocol/heartbeat/HeartbeatRespHandler.java   |   2 +-
 .../natraft/protocol/log/catchup/CatchUpTask.java  |  12 +-
 .../log/catchup/SnapshotCatchUpHandler.java        |  14 +-
 .../protocol/log/catchup/SnapshotCatchUpTask.java  |  30 ++--
 .../manager/DirectorySnapshotRaftLogManager.java   |  20 ++-
 .../protocol/log/manager/RaftLogManager.java       |  16 +-
 .../protocol/log/snapshot/DirectorySnapshot.java   | 191 ++++++++++++++++++++-
 .../natraft/protocol/log/snapshot/Snapshot.java    |   3 +-
 .../natraft/service/RaftRPCServiceProcessor.java   |  23 ++-
 .../iotdb/consensus/natraft/utils/IOUtils.java     |  95 ++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 thrift-raft/pom.xml                                |   2 +-
 thrift-raft/src/main/thrift/raft.thrift            |   9 +-
 22 files changed, 471 insertions(+), 68 deletions(-)

diff --git a/.gitignore b/.gitignore
index ee4b9d2f73..afcbcf77ef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,5 @@ node3/
 antlr/gen/
 antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/gen/
 antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.tokens
+distribution/test/
+distribution/distribute-fit.sh
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 89975e7bba..fb72d61faf 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -67,7 +67,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>thrift-raft-consensus</artifactId>
+            <artifactId>iotdb-thrift-raft-consensus</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index 5803157ea5..d2dbd5c5dc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.consensus.common;
 
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,10 @@ import java.util.List;
 public class Utils {
   private static final Logger logger = LoggerFactory.getLogger(Utils.class);
 
+  public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) {
+    return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
+  }
+
   public static List<Path> listAllRegularFilesRecursively(File rootDir) {
     List<Path> allFiles = new ArrayList<>();
     try {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 81fe9ebeb0..03ff338c79 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -111,6 +111,7 @@ public class RaftConsensus implements IConsensus {
     } else {
       try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
         for (Path path : stream) {
+          logger.info("Recovering a RaftMember from {}", path);
           Path fileName = path.getFileName();
           String[] items = fileName.toString().split("_");
           if (items.length != 2) {
@@ -121,6 +122,7 @@ public class RaftConsensus implements IConsensus {
                   Integer.parseInt(items[0]), Integer.parseInt(items[1]));
           RaftMember raftMember =
               new RaftMember(
+                  path.toString(),
                   config,
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
@@ -202,7 +204,7 @@ public class RaftConsensus implements IConsensus {
           }
           RaftMember impl =
               new RaftMember(
-                  config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
+                  path, config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
           impl.start();
           return impl;
         });
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index 15ffa4aa67..fef5e19c26 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -18,6 +18,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -82,6 +83,15 @@ public class SyncClientAdaptor {
     return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
   }
 
+  public static ByteBuffer readFile(
+      AsyncRaftServiceClient client, String remotePath, long offset, int fetchSize)
+      throws InterruptedException, TException {
+    GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getEndpoint());
+
+    client.readFile(remotePath, offset, fetchSize, handler);
+    return handler.getResult(config.getConnectionTimeoutInMS());
+  }
+
   public static void setConfig(RaftConfig config) {
     SyncClientAdaptor.config = config;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 7bc82ed8f9..b29d51c951 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -93,7 +93,7 @@ public class RaftConfig {
   private RPCConfig rpcConfig;
 
   public RaftConfig(ConsensusConfig config) {
-    this.storageDir = config.getStorageDir() + File.separator + "system";
+    this.storageDir = config.getStorageDir();
     new File(this.storageDir).mkdirs();
     this.rpcConfig = config.getRPCConfig();
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 97ad720f88..e0019b89c5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -187,6 +187,7 @@ public class RaftMember {
   private FlowBalancer flowBalancer;
 
   public RaftMember(
+      String storageDir,
       RaftConfig config,
       Peer thisNode,
       List<Peer> allNodes,
@@ -194,6 +195,7 @@ public class RaftMember {
       IStateMachine stateMachine,
       IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
     this.config = config;
+    this.storageDir = storageDir;
     initConfig();
 
     this.thisNode = thisNode;
@@ -252,7 +254,7 @@ public class RaftMember {
       for (int i = 0; i < size; i++) {
         allNodes.add(Peer.deserialize(buffer));
       }
-      logger.info("Recover IoTConsensus server Impl, configuration: {}", allNodes);
+      logger.info("{}: Recover IoTConsensus server Impl, configuration: {}", name, allNodes);
     } catch (IOException e) {
       logger.error("Unexpected error occurs when recovering configuration", e);
     }
@@ -304,7 +306,6 @@ public class RaftMember {
 
   private void initConfig() {
     this.enableWeakAcceptance = config.isEnableWeakAcceptance();
-    this.storageDir = config.getStorageDir();
   }
 
   public void initPeerMap() {
@@ -1006,10 +1007,32 @@ public class RaftMember {
     }
   }
 
-  public void installSnapshot(ByteBuffer snapshotBytes) {
-    DirectorySnapshot directorySnapshot = new DirectorySnapshot(null);
-    directorySnapshot.deserialize(snapshotBytes);
-    directorySnapshot.install(this);
+  public String getLocalSnapshotTmpDir(String remoteSnapshotDirName) {
+    return config.getStorageDir()
+        + File.separator
+        + groupId
+        + File.separator
+        + "remote_snapshot"
+        + File.separator
+        + remoteSnapshotDirName;
+  }
+
+  public TSStatus installSnapshot(ByteBuffer snapshotBytes, TEndPoint source) {
+    if (!snapshotApplyLock.tryLock()) {
+      return new TSStatus(TSStatusCode.SNAPSHOT_INSTALLING.getStatusCode());
+    }
+
+    DirectorySnapshot directorySnapshot;
+    try {
+      directorySnapshot = new DirectorySnapshot(null, null);
+      directorySnapshot.deserialize(snapshotBytes);
+      directorySnapshot.setSource(source);
+      directorySnapshot.setMemberName(name);
+      directorySnapshot.install(this);
+    } finally {
+      snapshotApplyLock.unlock();
+    }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   public boolean containsNode(Peer node) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
index a39a9524b3..cb1e74172b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
@@ -42,12 +42,26 @@ public class ElectionReqHandler {
   }
 
   public long processElectionRequest(ElectionRequest electionRequest) {
-    if (logger.isDebugEnabled()) {
-      logger.debug(
-          "{}: start to handle request from elector {}",
+    TEndPoint candidate = electionRequest.getElector();
+    Peer peer =
+        new Peer(
+            Factory.createFromTConsensusGroupId(electionRequest.groupId),
+            electionRequest.electorId,
+            candidate);
+    // check if the node is in the group
+    if (!member.containsNode(peer)) {
+      logger.info(
+          "{}: the elector {} is not in the data group {}, so reject this election.",
           member.getName(),
-          electionRequest.getElector());
+          peer,
+          member.getAllNodes());
+      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to handle request from elector {}", member.getName(), peer);
     }
+
     long currentTerm = member.getStatus().getTerm().get();
     long response =
         checkElectorTerm(currentTerm, electionRequest.getTerm(), electionRequest.getElector());
@@ -56,7 +70,7 @@ public class ElectionReqHandler {
     }
 
     // compare the log progress of the elector with this node
-    response = checkElectorLogProgress(electionRequest);
+    response = checkElectorLogProgress(electionRequest, peer);
     logger.info(
         "{} sending response {} to the elector {}",
         member.getName(),
@@ -109,22 +123,7 @@ public class ElectionReqHandler {
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
    *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
-  long checkElectorLogProgress(ElectionRequest electionRequest) {
-    TEndPoint candidate = electionRequest.getElector();
-    Peer peer =
-        new Peer(
-            Factory.createFromTConsensusGroupId(electionRequest.groupId),
-            electionRequest.electorId,
-            candidate);
-    // check if the node is in the group
-    if (!member.containsNode(peer)) {
-      logger.info(
-          "{}: the elector {} is not in the data group {}, so reject this election.",
-          member.getName(),
-          member.getAllNodes(),
-          candidate);
-      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
-    }
+  long checkElectorLogProgress(ElectionRequest electionRequest, Peer peer) {
 
     long thatTerm = electionRequest.getTerm();
     long thatLastLogIndex = electionRequest.getLastLogIndex();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
index 55d63af7f2..615a34224b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
 
 import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
index ae13851b02..326c5367f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
@@ -116,7 +116,7 @@ public class HeartbeatRespHandler implements AsyncMethodCallback<HeartBeatRespon
       if (lastLogIdx == peerInfo.getLastHeartBeatIndex() && !resp.isInstallingSnapshot()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peerInfo.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 5000) {
+        if (inconsistentNum >= 5) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index 11292d0b26..e7ae9e2dca 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -302,7 +302,12 @@ public class CatchUpTask implements Runnable {
     raftMember.getLogManager().takeSnapshot();
     snapshot = raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
     if (logger.isInfoEnabled()) {
-      logger.info("{}: Logs in {} are too old, catch up with snapshot", raftMember.getName(), node);
+      logger.info(
+          "{}: Logs in {} are too old, catch up with snapshot {}-{}",
+          raftMember.getName(),
+          node,
+          snapshot.getLastLogIndex(),
+          snapshot.getLastLogTerm());
     }
   }
 
@@ -349,7 +354,7 @@ public class CatchUpTask implements Runnable {
         catchUpSucceeded = task.call();
       }
       if (catchUpSucceeded) {
-        // the catch up may be triggered by an old heartbeat, and the node may have already
+        // the catch-up may be triggered by an old heartbeat, and the node may have already
         // caught up, so logs can be empty
         if (!logs.isEmpty() || snapshot != null) {
           long lastIndex =
@@ -367,6 +372,9 @@ public class CatchUpTask implements Runnable {
               System.currentTimeMillis() - startTime);
         }
         peerInfo.resetInconsistentHeartbeatNum();
+      } else {
+        // wait for a while before the next catch-up so that the status of the follower may update
+        Thread.sleep(5000);
       }
 
     } catch (LeaderUnknownException e) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
index 56af324502..05bd80e680 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
 
@@ -26,27 +27,28 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /** SnapshotCatchUpHandler receives the result of sending a snapshot to a stale node. */
-public class SnapshotCatchUpHandler implements AsyncMethodCallback<Void> {
+public class SnapshotCatchUpHandler implements AsyncMethodCallback<TSStatus> {
 
   private static final Logger logger = LoggerFactory.getLogger(SnapshotCatchUpHandler.class);
 
-  private AtomicBoolean succeed;
+  private AtomicReference<TSStatus> succeed;
   private Peer receiver;
   private Snapshot snapshot;
 
-  public SnapshotCatchUpHandler(AtomicBoolean succeed, Peer receiver, Snapshot snapshot) {
+  public SnapshotCatchUpHandler(
+      AtomicReference<TSStatus> succeed, Peer receiver, Snapshot snapshot) {
     this.succeed = succeed;
     this.receiver = receiver;
     this.snapshot = snapshot;
   }
 
   @Override
-  public void onComplete(Void resp) {
+  public void onComplete(TSStatus resp) {
     synchronized (succeed) {
-      succeed.set(true);
+      succeed.set(resp);
       succeed.notifyAll();
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
index 437c25d645..f8161e4db8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.exception.LeaderUnknownException;
@@ -27,6 +28,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
 import org.apache.iotdb.consensus.raft.thrift.SendSnapshotRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -35,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * SnapshotCatchUpTask first sends the snapshot to the stale node then sends the logs to the node.
@@ -62,6 +64,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
   private void doSnapshotCatchUp() throws TException, InterruptedException, LeaderUnknownException {
     SendSnapshotRequest request = new SendSnapshotRequest();
     request.setGroupId(raftMember.getRaftGroupId().convertToTConsensusGroupId());
+    request.setSource(raftMember.getThisNode().getEndpoint());
     logger.info("Start to send snapshot to {}", node);
     ByteBuffer data = snapshot.serialize();
     if (logger.isInfoEnabled()) {
@@ -76,34 +79,39 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
       }
     }
 
-    abort = !sendSnapshotAsync(request);
+    TSStatus tsStatus = sendSnapshotAsync(request);
+    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      abort = true;
+      logger.warn("Failed to send snapshot to {}: {}", node, tsStatus);
+    }
   }
 
   @SuppressWarnings("java:S2274") // enable timeout
-  private boolean sendSnapshotAsync(SendSnapshotRequest request)
+  private TSStatus sendSnapshotAsync(SendSnapshotRequest request)
       throws TException, InterruptedException {
-    AtomicBoolean succeed = new AtomicBoolean(false);
-    SnapshotCatchUpHandler handler = new SnapshotCatchUpHandler(succeed, node, snapshot);
+    AtomicReference<TSStatus> result =
+        new AtomicReference<>(new TSStatus(TSStatusCode.TIME_OUT.getStatusCode()));
+    SnapshotCatchUpHandler handler = new SnapshotCatchUpHandler(result, node, snapshot);
     AsyncRaftServiceClient client = raftMember.getClient(node.getEndpoint());
     if (client == null) {
       logger.info("{}: client null for node {}", raftMember.getThisNode(), node);
       abort = true;
-      return false;
+      return result.get();
     }
 
     logger.info(
         "{}: the snapshot request size={}",
         raftMember.getName(),
         request.getSnapshotBytes().length);
-    synchronized (succeed) {
+    synchronized (result) {
       client.sendSnapshot(request, handler);
       catchUpManager.registerTask(node);
-      succeed.wait(sendSnapshotWaitMs);
+      result.wait(sendSnapshotWaitMs);
     }
     if (logger.isInfoEnabled()) {
-      logger.info("send snapshot to node {} success {}", raftMember.getThisNode(), succeed.get());
+      logger.info("send snapshot to node {} success {}", raftMember.getThisNode(), result.get());
     }
-    return succeed.get();
+    return result.get();
   }
 
   @Override
@@ -122,7 +130,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
     } else {
       logger.warn("{}: Log catch up {} failed", raftMember.getName(), node);
     }
-    // the next catch up is enabled
+    // the next catch-up is enabled
     catchUpManager.unregisterTask(node);
     return !abort;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index c353d1d8cc..d6a032d32a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -25,12 +25,18 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
 import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
+import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 
 import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
 
 public class DirectorySnapshotRaftLogManager extends RaftLogManager {
 
   private File latestSnapshotDir;
+  private long snapshotIndex;
+  private long snapshotTerm;
+  DirectorySnapshot directorySnapshot;
 
   public DirectorySnapshotRaftLogManager(
       StableEntryManager stableEntryManager,
@@ -43,7 +49,7 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
 
   @Override
   public Snapshot getSnapshot(long minLogIndex) {
-    return new DirectorySnapshot(latestSnapshotDir);
+    return directorySnapshot;
   }
 
   @Override
@@ -55,6 +61,18 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
                 + getName()
                 + "-snapshot-"
                 + System.currentTimeMillis());
+    try {
+      lock.readLock().lock();
+      snapshotIndex = getAppliedIndex();
+      snapshotTerm = getAppliedTerm();
+    } finally {
+      lock.readLock().unlock();
+    }
     stateMachine.takeSnapshot(latestSnapshotDir);
+    List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
+    snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
+    directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles);
+    directorySnapshot.setLastLogIndex(snapshotIndex);
+    directorySnapshot.setLastLogTerm(snapshotTerm);
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 342c63e9fe..682bec242a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -752,6 +752,10 @@ public abstract class RaftLogManager {
     return appliedIndex;
   }
 
+  public long getAppliedTerm() {
+    return appliedTerm;
+  }
+
   /** check whether delete the committed log */
   void checkDeleteLog() {
     try {
@@ -881,10 +885,16 @@ public abstract class RaftLogManager {
           }
         }
       }
-      synchronized (changeApplyCommitIndexCond) {
-        // maxHaveAppliedCommitIndex may change if a snapshot is applied concurrently
-        appliedIndex = Math.max(appliedIndex, nextToCheckIndex);
+      if (nextToCheckIndex > appliedIndex) {
+        synchronized (changeApplyCommitIndexCond) {
+          // maxHaveAppliedCommitIndex may change if a snapshot is applied concurrently
+          if (nextToCheckIndex > appliedIndex) {
+            appliedTerm = log.getCurrLogTerm();
+            appliedIndex = nextToCheckIndex;
+          }
+        }
       }
+
       logger.debug(
           "{}: log={} is applied, nextToCheckIndex={}, commitIndex={}, maxHaveAppliedCommitIndex={}",
           name,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index c218104c88..d1fd917684 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,26 +4,65 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
+import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
 
 public class DirectorySnapshot extends Snapshot {
+
+  private static final Logger logger = LoggerFactory.getLogger(DirectorySnapshot.class);
   private File directory;
+  private List<Path> filePaths;
+  private TEndPoint source;
+  private String memberName;
 
-  public DirectorySnapshot(File directory) {
+  public DirectorySnapshot(File directory, List<Path> filePaths) {
     this.directory = directory;
+    this.filePaths = filePaths;
   }
 
   @Override
   public ByteBuffer serialize() {
-    byte[] bytes = directory.getAbsolutePath().getBytes();
-    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2 + bytes.length + Integer.BYTES);
+
+    byte[] rootBytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
+    int bufferSize = Long.BYTES * 2 + rootBytes.length + Integer.BYTES * 2;
+    byte[][] filePathBytes = new byte[filePaths.size()][];
+    for (int i = 0; i < filePaths.size(); i++) {
+      Path path = filePaths.get(i);
+      byte[] bytes = path.toString().getBytes(StandardCharsets.UTF_8);
+      filePathBytes[i] = bytes;
+      bufferSize += Integer.BYTES;
+      bufferSize += bytes.length;
+    }
+
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
     buffer.putLong(lastLogIndex);
     buffer.putLong(lastLogTerm);
-    buffer.putInt(bytes.length);
-    buffer.put(bytes);
+    buffer.putInt(rootBytes.length);
+    buffer.put(rootBytes);
+    buffer.putInt(filePaths.size());
+    for (byte[] relativeFileByte : filePathBytes) {
+      buffer.putInt(relativeFileByte.length);
+      buffer.put(relativeFileByte);
+    }
     buffer.flip();
     return buffer;
   }
@@ -35,12 +74,148 @@ public class DirectorySnapshot extends Snapshot {
     int size = buffer.getInt();
     byte[] bytes = new byte[size];
     buffer.get(bytes);
-    directory = new File(new String(bytes));
+    directory = new File(new String(bytes, StandardCharsets.UTF_8));
+
+    int fileNum = buffer.getInt();
+    filePaths = new ArrayList<>(fileNum);
+    for (int i = 0; i < fileNum; i++) {
+      int pathSize = buffer.getInt();
+      bytes = new byte[pathSize];
+      buffer.get(bytes);
+      filePaths.add(new File(new String(bytes, StandardCharsets.UTF_8)).toPath());
+    }
   }
 
   @Override
-  public void install(RaftMember member) {
-    member.getStateMachine().loadSnapshot(directory);
+  public TSStatus install(RaftMember member) {
+    String localSnapshotTmpDirPath = member.getLocalSnapshotTmpDir(directory.getName());
+    TSStatus tsStatus = downloadSnapshot(localSnapshotTmpDirPath, member);
+    if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return tsStatus;
+    }
+    member.getStateMachine().loadSnapshot(new File(localSnapshotTmpDirPath));
     member.getLogManager().applySnapshot(this);
+    return tsStatus;
+  }
+
+  public void setSource(TEndPoint source) {
+    this.source = source;
+  }
+
+  public void setMemberName(String memberName) {
+    this.memberName = memberName;
+  }
+
+  private TSStatus downloadSnapshot(String localSnapshotTmpDirPath, RaftMember member) {
+    File localSnapshotTmpDir = new File(localSnapshotTmpDirPath);
+    localSnapshotTmpDir.mkdirs();
+    logger.info(
+        "Downloading {} files from {} to {}", filePaths.size(), source, localSnapshotTmpDir);
+    String snapshotName = directory.getName();
+    for (Path path : filePaths) {
+      logger.info("Downloading {} of {}", path, snapshotName);
+      String pathStr = path.toString();
+      String relativePath =
+          pathStr.substring(pathStr.indexOf(snapshotName) + snapshotName.length());
+      String targetFilePath = localSnapshotTmpDirPath + File.separator + relativePath;
+      TSStatus tsStatus = downloadFile(pathStr, targetFilePath, member);
+      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.warn("Downloading failed: {}", tsStatus);
+        return tsStatus;
+      }
+      logger.info("File downloaded");
+    }
+    logger.info("Downloaded {} files from {}", filePaths.size(), source);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  private TSStatus downloadFile(String sourceFilePath, String targetFilePath, RaftMember member) {
+    int pullFileRetry = 5;
+    File targetFile = new File(targetFilePath);
+    targetFile.getParentFile().mkdirs();
+    for (int i = 0; i < pullFileRetry; i++) {
+      try (BufferedOutputStream bufferedOutputStream =
+          new BufferedOutputStream(Files.newOutputStream(targetFile.toPath()))) {
+        downloadFileAsync(source, sourceFilePath, bufferedOutputStream, member);
+
+        if (logger.isInfoEnabled()) {
+          logger.info(
+              "{}: remote file {} is pulled at {}, length: {}",
+              memberName,
+              sourceFilePath,
+              targetFilePath,
+              new File(targetFilePath).length());
+        }
+        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } catch (TException e) {
+        logger.warn(
+            "{}: Cannot pull file {} from {}, wait 5s to retry",
+            memberName,
+            sourceFilePath,
+            source,
+            e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn(
+            "{}: Pulling file {} from {} interrupted", memberName, sourceFilePath, source, e);
+        return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+            .setMessage("Interrupted");
+      } catch (IOException e) {
+        logger.warn("{}: Pulling file {} from {} failed", memberName, sourceFilePath, source, e);
+        return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+            .setMessage(e.getMessage());
+      }
+
+      try {
+        Files.delete(new File(targetFilePath).toPath());
+        Thread.sleep(5000);
+      } catch (IOException e) {
+        logger.warn("Cannot delete file when pulling {} from {} failed", sourceFilePath, source);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        logger.warn(
+            "{}: Pulling file {} from {} interrupted", memberName, sourceFilePath, source, ex);
+        return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+            .setMessage("Interrupted");
+      }
+      // next try
+    }
+    return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+        .setMessage(String.format("Cannot pull file %s from %s", sourceFilePath, source));
+  }
+
+  private void downloadFileAsync(
+      TEndPoint node, String remotePath, OutputStream dest, RaftMember member)
+      throws IOException, TException, InterruptedException {
+    long offset = 0;
+    // TODO-Cluster: use elaborate downloading techniques
+    int fetchSize = 64 * 1024;
+
+    while (true) {
+      AsyncRaftServiceClient client = member.getClient(node);
+      if (client == null) {
+        throw new IOException("No available client for " + node.toString());
+      }
+      ByteBuffer buffer;
+      buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+      int len = writeBuffer(buffer, dest);
+      if (len == 0) {
+        break;
+      }
+      offset += len;
+    }
+    dest.flush();
+  }
+
+  private int writeBuffer(ByteBuffer buffer, OutputStream dest) throws IOException {
+    if (buffer == null || buffer.limit() - buffer.position() == 0) {
+      return 0;
+    }
+
+    // notice: the buffer returned by thrift is a slice of a larger buffer which contains
+    // the whole response, so buffer.position() is not 0 initially and buffer.limit() is
+    // not the size of the downloaded chunk
+    dest.write(buffer.array(), buffer.position() + buffer.arrayOffset(), buffer.remaining());
+    return buffer.remaining();
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index c2260ce97f..8414ad5483 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 
 import java.nio.ByteBuffer;
@@ -55,7 +56,7 @@ public abstract class Snapshot {
     return lastLogTerm;
   }
 
-  public abstract void install(RaftMember member);
+  public abstract TSStatus install(RaftMember member);
 
   /**
    * Discard contents which is generated by logs whose index <= 'minIndex' if possible. This method
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 41267531e0..cefecbf9a2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.natraft.RaftConsensus;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -42,6 +43,9 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
 
   private final Logger logger = LoggerFactory.getLogger(RaftRPCServiceProcessor.class);
@@ -74,6 +78,10 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
   public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler)
       throws TException {
     RaftMember member = getMember(request.groupId);
+    logger.info(
+        "Member for election request: {}, request groupId {}",
+        member.getRaftGroupId(),
+        request.getGroupId());
     resultHandler.onComplete(member.processElectionRequest(request));
   }
 
@@ -90,11 +98,10 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
   }
 
   @Override
-  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler)
+  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<TSStatus> resultHandler)
       throws TException {
     RaftMember member = getMember(request.groupId);
-    member.installSnapshot(request.snapshotBytes);
-    resultHandler.onComplete(null);
+    resultHandler.onComplete(member.installSnapshot(request.snapshotBytes, request.source));
   }
 
   @Override
@@ -127,4 +134,14 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
     RaftMember member = getMember(groupId);
     resultHandler.onComplete(member.requestCommitIndex());
   }
+
+  @Override
+  public void readFile(
+      String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
+    try {
+      resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
+    } catch (IOException e) {
+      resultHandler.onError(e);
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
index fa232fcbbd..cb1271e6cb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
@@ -22,6 +22,17 @@ package org.apache.iotdb.consensus.natraft.utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 @SuppressWarnings("java:S1135")
 public class IOUtils {
 
@@ -38,4 +49,88 @@ public class IOUtils {
     }
     return curr;
   }
+
+  public static List<String> collectRelativePaths(File directory) {
+    if (!directory.exists() || !directory.isDirectory()) {
+      logger.warn("{} is not a directory", directory);
+      return Collections.emptyList();
+    }
+
+    String currentRelativePath = "";
+    List<String> result = new ArrayList<>();
+    collectRelativePaths(directory, currentRelativePath, result);
+    return result;
+  }
+
+  public static void collectRelativePaths(
+      File directory, String currentRelativePath, List<String> result) {
+    File[] files = directory.listFiles();
+    if (files == null) {
+      logger.warn("Cannot list files under {}", directory);
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        collectRelativePaths(file, currentRelativePath + File.separator + file.getName(), result);
+      } else {
+        result.add(currentRelativePath + File.separator + file.getName());
+      }
+    }
+  }
+
+  public static List<Path> collectPaths(File directory) {
+    if (!directory.exists() || !directory.isDirectory()) {
+      logger.warn("{} is not a directory", directory);
+      return Collections.emptyList();
+    }
+
+    List<Path> result = new ArrayList<>();
+    collectPaths(directory, result);
+    return result;
+  }
+
+  public static void collectPaths(File directory, List<Path> result) {
+    File[] files = directory.listFiles();
+    if (files == null) {
+      logger.warn("Cannot list files under {}", directory);
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        collectPaths(file, result);
+      } else {
+        result.add(file.toPath());
+      }
+    }
+  }
+
+  /**
+   * An interface that is used for a node to pull chunks of files like TsFiles. The file should be a
+   * temporary hard link, and once the file is totally read, it will be removed.
+   */
+  public static ByteBuffer readFile(String filePath, long offset, int length) throws IOException {
+    File file = new File(filePath);
+    if (!file.exists()) {
+      logger.warn("Reading a non-existing snapshot file {}", filePath);
+      return ByteBuffer.allocate(0);
+    }
+
+    ByteBuffer result;
+    int len;
+    try (BufferedInputStream bufferedInputStream =
+        new BufferedInputStream(Files.newInputStream(file.toPath()))) {
+      skipExactly(bufferedInputStream, offset);
+      byte[] bytes = new byte[length];
+      result = ByteBuffer.wrap(bytes);
+      len = bufferedInputStream.read(bytes);
+      result.limit(Math.max(len, 0));
+    }
+    return result;
+  }
+
+  private static void skipExactly(InputStream stream, long byteToSkip) throws IOException {
+    while (byteToSkip > 0) {
+      byteToSkip -= stream.skip(byteToSkip);
+    }
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 175d27802d..89252624b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -50,6 +50,7 @@ public enum TSStatusCode {
   TIME_OUT(307),
   NO_LEADER(308),
   READ_ONLY(309),
+  SNAPSHOT_INSTALLING(310),
 
   // Client,
   REDIRECTION_RECOMMEND(400),
diff --git a/thrift-raft/pom.xml b/thrift-raft/pom.xml
index 8a056dc73a..45994aeaa9 100644
--- a/thrift-raft/pom.xml
+++ b/thrift-raft/pom.xml
@@ -27,7 +27,7 @@
         <version>1.1.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>thrift-raft-consensus</artifactId>
+    <artifactId>iotdb-thrift-raft-consensus</artifactId>
     <name>rpc-thrift-raft-consensus</name>
     <description>Rpc modules for raft consensus algorithm</description>
     <dependencies>
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index 419bf2f68e..a518a239e0 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -100,6 +100,7 @@ struct SendSnapshotRequest {
   1: required binary snapshotBytes
   // for data group
   2: required common.TConsensusGroupId groupId
+  3: required common.TEndPoint source
 }
 
 struct RequestCommitIndexResponse {
@@ -150,7 +151,7 @@ service RaftService {
   **/
   AppendEntryResult appendEntries(1:AppendEntriesRequest request)
 
-  void sendSnapshot(1:SendSnapshotRequest request)
+  common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
 
   /**
   * Test if a log of "index" and "term" exists.
@@ -169,4 +170,10 @@ service RaftService {
     * leader.
     **/
     RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
+
+    /**
+      * Read a chunk of a file from the client. If the remaining of the file does not have enough
+      * bytes, only the remaining will be returned.
+      **/
+      binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
 }
\ No newline at end of file