You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/04 06:24:39 UTC

[incubator-ratis] branch master updated: RATIS-1200. Refactor LogAppender.SnapshotRequestIter. (#319)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 86dd7fa  RATIS-1200. Refactor LogAppender.SnapshotRequestIter. (#319)
86dd7fa is described below

commit 86dd7fa68081e33428ed65dfdb613032b1378560
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Dec 4 14:24:32 2020 +0800

    RATIS-1200. Refactor LogAppender.SnapshotRequestIter. (#319)
    
    * RATIS-1200. Refactor LogAppender.SnapshotRequestIter.
    
    * Fix a bug and checkstyle
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   5 +-
 .../java/org/apache/ratis/server/RaftServer.java   |  12 +-
 .../ratis/server/impl/InstallSnapshotRequests.java | 137 +++++++++++++++++++++
 .../apache/ratis/server/impl/LeaderElection.java   |   4 +-
 .../org/apache/ratis/server/impl/LogAppender.java  | 129 ++-----------------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  39 ++----
 .../ratis/server/storage/FileChunkReader.java      |  91 ++++++++++++++
 .../apache/ratis/statemachine/SnapshotInfo.java    |  13 +-
 .../statemachine/impl/FileListSnapshotInfo.java    |  18 +--
 .../statemachine/impl/SingleFileSnapshotInfo.java  |   6 +-
 .../ratis/datastream/DataStreamBaseTest.java       |   9 +-
 11 files changed, 283 insertions(+), 180 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 9bbf9fe..38a2e41 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -477,8 +477,7 @@ public class GrpcLogAppender extends LogAppender {
     final String requestId = UUID.randomUUID().toString();
     try {
       snapshotRequestObserver = getClient().installSnapshot(responseHandler);
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
+      for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isAppenderRunning()) {
           snapshotRequestObserver.onNext(request);
           getFollower().updateLastRpcSendTime();
@@ -524,7 +523,7 @@ public class GrpcLogAppender extends LogAppender {
     final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     // prepare and enqueue the notify install snapshot request.
-    final InstallSnapshotRequestProto request = createInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+    final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
     if (LOG.isInfoEnabled()) {
       LOG.info("{}: send {}", this, ServerProtoUtils.toString(request));
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index c4b9b0f..d6e267b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -23,10 +23,12 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
@@ -67,7 +69,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
     DivisionInfo getInfo();
 
     /** @return the {@link RaftGroup} for this division. */
-    RaftGroup getGroup();
+    default RaftGroup getGroup() {
+      return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
+    }
+
+    /** @return the current {@link RaftConfiguration} for this division. */
+    RaftConfiguration getRaftConf();
 
     /** @return the {@link RaftServer} containing this division. */
     RaftServer getRaftServer();
@@ -78,6 +85,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the raft log of this division. */
     RaftLog getRaftLog();
 
+    /** @return the storage of this division. */
+    RaftStorage getRaftStorage();
+
     /** @return the data stream map of this division. */
     DataStreamMap getDataStreamMap();
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
new file mode 100644
index 0000000..448770a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.storage.FileChunkReader;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a snapshot.
+ *
+ * The snapshot is sent by one or more requests, where
+ * a snapshot has one or more files, and
+ * a file is sent by one or more chunks.
+ * The number of requests is equal to the sum of the numbers of chunks of each file.
+ */
+class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
+  private final RaftServer.Division server;
+  private final RaftPeerId followerId;
+
+  /** The snapshot to be sent. */
+  private final SnapshotInfo snapshot;
+  /** A fixed id for all the requests. */
+  private final String requestId;
+
+  /** Maximum chunk size. */
+  private final int snapshotChunkMaxSize;
+
+  /** The index of the current request. */
+  private int requestIndex = 0;
+
+  /** The index of the current file. */
+  private int fileIndex = 0;
+  /** The current file. */
+  private FileChunkReader current;
+
+  InstallSnapshotRequests(RaftServer.Division server, RaftPeerId followerId,
+      String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
+    this.server = server;
+    this.followerId = followerId;
+    this.requestId = requestId;
+    this.snapshot = snapshot;
+    this.snapshotChunkMaxSize = snapshotChunkMaxSize;
+  }
+
+  @Override
+  public Iterator<InstallSnapshotRequestProto> iterator() {
+    return new Iterator<InstallSnapshotRequestProto>() {
+      @Override
+      public boolean hasNext() {
+        return fileIndex < snapshot.getFiles().size();
+      }
+
+      @Override
+      public InstallSnapshotRequestProto next() {
+        return nextInstallSnapshotRequestProto();
+      }
+    };
+  }
+
+  private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
+    final int numFiles = snapshot.getFiles().size();
+    if (fileIndex >= numFiles) {
+      throw new NoSuchElementException();
+    }
+    final FileInfo info = snapshot.getFiles().get(fileIndex);
+    try {
+      if (current == null) {
+        current = new FileChunkReader(info, server.getRaftStorage().getStorageDir());
+      }
+      final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
+      if (chunk.getDone()) {
+        current.close();
+        current = null;
+        fileIndex++;
+      }
+
+      final boolean done = fileIndex == numFiles && chunk.getDone();
+      return newInstallSnapshotRequest(chunk, done);
+    } catch (IOException e) {
+      if (current != null) {
+        try {
+          current.close();
+          current = null;
+        } catch (IOException ignored) {
+        }
+      }
+      throw new IllegalStateException("Failed to iterate installSnapshot requests: " + this, e);
+    }
+  }
+
+  private InstallSnapshotRequestProto newInstallSnapshotRequest(FileChunkProto chunk, boolean done) {
+    final long totalSize = snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
+        () -> new IllegalStateException("Failed to compute total size for snapshot " + snapshot));
+    synchronized (server) {
+      return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), followerId,
+          requestId, requestIndex++, server.getInfo().getCurrentTerm(), snapshot.getTermIndex(),
+          Collections.singletonList(chunk), totalSize, done, server.getRaftConf());
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    return server.getId() + "->" + followerId + JavaUtils.getClassSimpleName(getClass())
+        + ": requestId=" + requestId
+        + ", requestIndex=" + requestIndex
+        + ", fileIndex=" + fileIndex
+        + ", currentFile=" + current
+        + ", snapshot=" + snapshot;
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 84c842d..9db608e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -262,8 +262,8 @@ class LeaderElection implements Runnable {
       Collection<RaftPeer> others, Executor voteExecutor) {
     int submitted = 0;
     for (final RaftPeer peer : others) {
-      final RequestVoteRequestProto r = server.createRequestVoteRequest(
-          peer.getId(), electionTerm, lastEntry);
+      final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
+          server.getMemberId(), peer.getId(), electionTerm, lastEntry);
       voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
       submitted++;
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 5336420..d9aeab4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -25,22 +25,17 @@ import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.nio.file.Path;
 import java.util.*;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
@@ -181,10 +176,6 @@ public class LogAppender {
     return raftLog;
   }
 
-  public long getHalfMinTimeoutMs() {
-    return halfMinTimeoutMs;
-  }
-
   @Override
   public String toString() {
     return name;
@@ -330,126 +321,24 @@ public class LogAppender {
     }
   }
 
-  protected class SnapshotRequestIter
-      implements Iterable<InstallSnapshotRequestProto> {
-    private final SnapshotInfo snapshot;
-    private final List<FileInfo> files;
-    private FileInputStream in;
-    private int fileIndex = 0;
-
-    private FileInfo currentFileInfo;
-    private byte[] currentBuf;
-    private long currentFileSize;
-    private long currentOffset = 0;
-    private int chunkIndex = 0;
-
-    private final String requestId;
-    private int requestIndex = 0;
-
-    public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
-        throws IOException {
-      this.snapshot = snapshot;
-      this.requestId = requestId;
-      this.files = snapshot.getFiles();
-      if (files.size() > 0) {
-        startReadFile();
-      }
-    }
-
-    private void startReadFile() throws IOException {
-      currentFileInfo = files.get(fileIndex);
-      File snapshotFile = currentFileInfo.getPath().toFile();
-      currentFileSize = snapshotFile.length();
-      final int bufLength = getSnapshotChunkLength(currentFileSize);
-      currentBuf = new byte[bufLength];
-      currentOffset = 0;
-      chunkIndex = 0;
-      in = new FileInputStream(snapshotFile);
-    }
-
-    private int getSnapshotChunkLength(long len) {
-      return len < snapshotChunkMaxSize? (int)len: snapshotChunkMaxSize;
-    }
-
-    @Override
-    public Iterator<InstallSnapshotRequestProto> iterator() {
-      return new Iterator<InstallSnapshotRequestProto>() {
-        @Override
-        public boolean hasNext() {
-          return fileIndex < files.size();
-        }
-
-        @Override
-        public InstallSnapshotRequestProto next() {
-          if (fileIndex >= files.size()) {
-            throw new NoSuchElementException();
-          }
-          final int targetLength = getSnapshotChunkLength(
-              currentFileSize - currentOffset);
-          FileChunkProto chunk;
-          try {
-            chunk = readFileChunk(currentFileInfo, in, currentBuf,
-                targetLength, currentOffset, chunkIndex);
-            boolean done = (fileIndex == files.size() - 1) &&
-                chunk.getDone();
-            InstallSnapshotRequestProto request =
-                server.createInstallSnapshotRequest(follower.getPeer().getId(),
-                    requestId, requestIndex++, snapshot,
-                    Collections.singletonList(chunk), done);
-            currentOffset += targetLength;
-            chunkIndex++;
-
-            if (currentOffset >= currentFileSize) {
-              in.close();
-              fileIndex++;
-              if (fileIndex < files.size()) {
-                startReadFile();
-              }
-            }
-
-            return request;
-          } catch (IOException e) {
-            if (in != null) {
-              try {
-                in.close();
-              } catch (IOException ignored) {
-              }
-            }
-            LOG.warn("{}: Failed to prepare installSnapshot request", LogAppender.this, e);
-            throw new RuntimeException(e);
-          }
-        }
-      };
+  protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
+    Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
+    synchronized (server) {
+      return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), getFollowerId(),
+          server.getInfo().getCurrentTerm(), firstAvailableLogTermIndex, server.getRaftConf());
     }
   }
 
-  protected InstallSnapshotRequestProto createInstallSnapshotNotificationRequest(
-      TermIndex firstLogStartTermIndex) {
-    return server.createInstallSnapshotRequest(getFollowerId(), firstLogStartTermIndex);
-  }
-
-  private FileChunkProto readFileChunk(FileInfo fileInfo,
-      FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
-      throws IOException {
-    FileChunkProto.Builder builder = FileChunkProto.newBuilder()
-        .setOffset(offset).setChunkIndex(chunkIndex);
-    IOUtils.readFully(in, buf, 0, length);
-    Path relativePath = server.getState().getStorage().getStorageDir()
-        .relativizeToRoot(fileInfo.getPath());
-    builder.setFilename(relativePath.toString());
-    builder.setDone(offset + length == fileInfo.getFileSize());
-    builder.setFileDigest(
-        ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
-    builder.setData(ByteString.copyFrom(buf, 0, length));
-    return builder.build();
+  protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
+      String requestId, SnapshotInfo snapshot) {
+    return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
   }
 
   private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
     String requestId = UUID.randomUUID().toString();
     InstallSnapshotReplyProto reply = null;
     try {
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
+      for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         follower.updateLastRpcSendTime();
         reply = server.getServerRpc().installSnapshot(request);
         follower.updateLastRpcResponseTime();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b90fe57..4a9bac6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -45,10 +45,9 @@ import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -251,6 +250,11 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   @Override
+  public RaftStorage getRaftStorage() {
+    return getState().getStorage();
+  }
+
+  @Override
   public DataStreamMap getDataStreamMap() {
     return dataStreamMap;
   }
@@ -345,13 +349,9 @@ class RaftServerImpl implements RaftServer.Division,
     return role;
   }
 
-  RaftConfiguration getRaftConf() {
-    return getState().getRaftConf();
-  }
-
   @Override
-  public RaftGroup getGroup() {
-    return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
+  public RaftConfiguration getRaftConf() {
+    return getState().getRaftConf();
   }
 
   /**
@@ -1494,29 +1494,6 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
-      RaftPeerId targetId, String requestId, int requestIndex,
-      SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
-    OptionalLong totalSize = snapshot.getFiles().stream()
-        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
-    assert totalSize.isPresent();
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
-        requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
-        chunks, totalSize.getAsLong(), done, getRaftConf());
-  }
-
-  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
-      RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
-    assert (firstAvailableLogTermIndex.getIndex() > 0);
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
-        state.getCurrentTerm(), firstAvailableLogTermIndex, getRaftConf());
-  }
-
-  synchronized RequestVoteRequestProto createRequestVoteRequest(
-      RaftPeerId targetId, long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, term, lastEntry);
-  }
-
   void submitUpdateCommitEvent() {
     role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
new file mode 100644
index 0000000..32b062f
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+
+/** Read {@link FileChunkProto}s from a file. */
+public class FileChunkReader implements Closeable {
+  private final FileInfo info;
+  private final Path relativePath;
+  private final FileInputStream in;
+  /** The offset position of the current chunk. */
+  private long offset = 0;
+  /** The index of the current chunk. */
+  private int chunkIndex = 0;
+
+  /**
+   * Construct a reader from a file specified by the given {@link FileInfo}.
+   *
+   * @param info the information of the file.
+   * @param directory the directory where the file is stored.
+   * @throws IOException if it failed to open the file.
+   */
+  public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOException {
+    this.info = info;
+    this.relativePath = directory.relativizeToRoot(info.getPath());
+    final File f = info.getPath().toFile();
+    this.in = new FileInputStream(f);
+  }
+
+  /**
+   * Read the next chunk.
+   *
+   * @param chunkMaxSize maximum chunk size
+   * @return the chunk read from the file.
+   * @throws IOException if it failed to read the file.
+   */
+  public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
+    final long remaining = info.getFileSize() - offset;
+    final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
+    final ByteString data = ByteString.readFrom(in, chunkLength);
+
+    final FileChunkProto proto = FileChunkProto.newBuilder()
+        .setFilename(relativePath.toString())
+        .setOffset(offset)
+        .setChunkIndex(chunkIndex)
+        .setDone(offset + chunkLength == info.getFileSize())
+        .setData(data)
+        .setFileDigest(ByteString.copyFrom(info.getFileDigest().getDigest()))
+        .build();
+    chunkIndex++;
+    offset += chunkLength;
+    return proto;
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  public String toString() {
+    return JavaUtils.getClassSimpleName(getClass())
+        + "{chunkIndex=" + chunkIndex
+        + ", offset=" + offset
+        + ", " + info + '}';
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
index f0aadd9..e8fb288 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,22 +31,23 @@ import org.apache.ratis.server.storage.FileInfo;
 public interface SnapshotInfo {
 
   /**
-   * Returns the term and index corresponding to this snapshot.
    * @return The term and index corresponding to this snapshot.
    */
   TermIndex getTermIndex();
 
   /**
-   * Returns the term corresponding to this snapshot.
    * @return The term corresponding to this snapshot.
    */
-  long getTerm();
+  default long getTerm() {
+    return getTermIndex().getTerm();
+  }
 
   /**
-   * Returns the index corresponding to this snapshot.
    * @return The index corresponding to this snapshot.
    */
-  long getIndex();
+  default long getIndex() {
+    return getTermIndex().getIndex();
+  }
 
   /**
    * Returns a list of files corresponding to this snapshot. This list should include all
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
index 0e21223..265523b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,12 +17,14 @@
  */
 package org.apache.ratis.statemachine.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
 
 /**
  * Each snapshot has a list of files.
@@ -35,7 +37,7 @@ public class FileListSnapshotInfo implements SnapshotInfo {
 
   public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
     this.termIndex = TermIndex.newTermIndex(term, index);
-    this.files = Collections.unmodifiableList(files);
+    this.files = Collections.unmodifiableList(new ArrayList<>(files));
   }
 
   @Override
@@ -44,22 +46,12 @@ public class FileListSnapshotInfo implements SnapshotInfo {
   }
 
   @Override
-  public long getTerm() {
-    return termIndex.getTerm();
-  }
-
-  @Override
-  public long getIndex() {
-    return termIndex.getIndex();
-  }
-
-  @Override
   public List<FileInfo> getFiles() {
     return files;
   }
 
   @Override
   public String toString() {
-    return termIndex + ":" + files;
+    return JavaUtils.getClassSimpleName(getClass()) + getTermIndex() + ":" + files;
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
index 797db17..e51f26f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.statemachine.impl;
 
-import java.util.Arrays;
+import java.util.Collections;
 
 import org.apache.ratis.server.storage.FileInfo;
 
@@ -28,7 +28,7 @@ import org.apache.ratis.server.storage.FileInfo;
  */
 public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
   public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
-    super(Arrays.asList(fileInfo), term, endIndex);
+    super(Collections.singletonList(fileInfo), term, endIndex);
   }
 
   /** @return the file associated with the snapshot. */
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b91fb77..8d94b9b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -53,9 +53,11 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.util.CollectionUtils;
@@ -96,7 +98,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
 
     @Override
-    public RaftGroup getGroup() {
+    public RaftConfiguration getRaftConf() {
       return null;
     }
 
@@ -116,6 +118,11 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
 
     @Override
+    public RaftStorage getRaftStorage() {
+      return null;
+    }
+
+    @Override
     public DataStreamMap getDataStreamMap() {
       return streamMap;
     }