You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/04 06:24:39 UTC
[incubator-ratis] branch master updated: RATIS-1200. Refactor
LogAppender.SnapshotRequestIter. (#319)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 86dd7fa RATIS-1200. Refactor LogAppender.SnapshotRequestIter. (#319)
86dd7fa is described below
commit 86dd7fa68081e33428ed65dfdb613032b1378560
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Dec 4 14:24:32 2020 +0800
RATIS-1200. Refactor LogAppender.SnapshotRequestIter. (#319)
* RATIS-1200. Refactor LogAppender.SnapshotRequestIter.
* Fix a bug and checkstyle
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 5 +-
.../java/org/apache/ratis/server/RaftServer.java | 12 +-
.../ratis/server/impl/InstallSnapshotRequests.java | 137 +++++++++++++++++++++
.../apache/ratis/server/impl/LeaderElection.java | 4 +-
.../org/apache/ratis/server/impl/LogAppender.java | 129 ++-----------------
.../apache/ratis/server/impl/RaftServerImpl.java | 39 ++----
.../ratis/server/storage/FileChunkReader.java | 91 ++++++++++++++
.../apache/ratis/statemachine/SnapshotInfo.java | 13 +-
.../statemachine/impl/FileListSnapshotInfo.java | 18 +--
.../statemachine/impl/SingleFileSnapshotInfo.java | 6 +-
.../ratis/datastream/DataStreamBaseTest.java | 9 +-
11 files changed, 283 insertions(+), 180 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 9bbf9fe..38a2e41 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -477,8 +477,7 @@ public class GrpcLogAppender extends LogAppender {
final String requestId = UUID.randomUUID().toString();
try {
snapshotRequestObserver = getClient().installSnapshot(responseHandler);
- for (InstallSnapshotRequestProto request :
- new SnapshotRequestIter(snapshot, requestId)) {
+ for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
if (isAppenderRunning()) {
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime();
@@ -524,7 +523,7 @@ public class GrpcLogAppender extends LogAppender {
final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
// prepare and enqueue the notify install snapshot request.
- final InstallSnapshotRequestProto request = createInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+ final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
if (LOG.isInfoEnabled()) {
LOG.info("{}: send {}", this, ServerProtoUtils.toString(request));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index c4b9b0f..d6e267b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -23,10 +23,12 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcFactory;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
@@ -67,7 +69,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
DivisionInfo getInfo();
/** @return the {@link RaftGroup} for this division. */
- RaftGroup getGroup();
+ default RaftGroup getGroup() {
+ return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
+ }
+
+ /** @return the current {@link RaftConfiguration} for this division. */
+ RaftConfiguration getRaftConf();
/** @return the {@link RaftServer} containing this division. */
RaftServer getRaftServer();
@@ -78,6 +85,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the raft log of this division. */
RaftLog getRaftLog();
+ /** @return the storage of this division. */
+ RaftStorage getRaftStorage();
+
/** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
new file mode 100644
index 0000000..448770a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.storage.FileChunkReader;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a snapshot.
+ *
+ * The snapshot is sent by one or more requests, where
+ * a snapshot has one or more files, and
+ * a file is sent by one or more chunks.
+ * The number of requests is equal to the sum of the numbers of chunks of each file.
+ */
+class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
+ private final RaftServer.Division server;
+ private final RaftPeerId followerId;
+
+ /** The snapshot to be sent. */
+ private final SnapshotInfo snapshot;
+ /** A fixed id for all the requests. */
+ private final String requestId;
+
+ /** Maximum chunk size. */
+ private final int snapshotChunkMaxSize;
+
+ /** The index of the current request. */
+ private int requestIndex = 0;
+
+ /** The index of the current file. */
+ private int fileIndex = 0;
+ /** The current file. */
+ private FileChunkReader current;
+
+ InstallSnapshotRequests(RaftServer.Division server, RaftPeerId followerId,
+ String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
+ this.server = server;
+ this.followerId = followerId;
+ this.requestId = requestId;
+ this.snapshot = snapshot;
+ this.snapshotChunkMaxSize = snapshotChunkMaxSize;
+ }
+
+ @Override
+ public Iterator<InstallSnapshotRequestProto> iterator() {
+ return new Iterator<InstallSnapshotRequestProto>() {
+ @Override
+ public boolean hasNext() {
+ return fileIndex < snapshot.getFiles().size();
+ }
+
+ @Override
+ public InstallSnapshotRequestProto next() {
+ return nextInstallSnapshotRequestProto();
+ }
+ };
+ }
+
+ private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
+ final int numFiles = snapshot.getFiles().size();
+ if (fileIndex >= numFiles) {
+ throw new NoSuchElementException();
+ }
+ final FileInfo info = snapshot.getFiles().get(fileIndex);
+ try {
+ if (current == null) {
+ current = new FileChunkReader(info, server.getRaftStorage().getStorageDir());
+ }
+ final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
+ if (chunk.getDone()) {
+ current.close();
+ current = null;
+ fileIndex++;
+ }
+
+ final boolean done = fileIndex == numFiles && chunk.getDone();
+ return newInstallSnapshotRequest(chunk, done);
+ } catch (IOException e) {
+ if (current != null) {
+ try {
+ current.close();
+ current = null;
+ } catch (IOException ignored) {
+ }
+ }
+ throw new IllegalStateException("Failed to iterate installSnapshot requests: " + this, e);
+ }
+ }
+
+ private InstallSnapshotRequestProto newInstallSnapshotRequest(FileChunkProto chunk, boolean done) {
+ final long totalSize = snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
+ () -> new IllegalStateException("Failed to compute total size for snapshot " + snapshot));
+ synchronized (server) {
+ return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), followerId,
+ requestId, requestIndex++, server.getInfo().getCurrentTerm(), snapshot.getTermIndex(),
+ Collections.singletonList(chunk), totalSize, done, server.getRaftConf());
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return server.getId() + "->" + followerId + JavaUtils.getClassSimpleName(getClass())
+ + ": requestId=" + requestId
+ + ", requestIndex=" + requestIndex
+ + ", fileIndex=" + fileIndex
+ + ", currentFile=" + current
+ + ", snapshot=" + snapshot;
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 84c842d..9db608e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -262,8 +262,8 @@ class LeaderElection implements Runnable {
Collection<RaftPeer> others, Executor voteExecutor) {
int submitted = 0;
for (final RaftPeer peer : others) {
- final RequestVoteRequestProto r = server.createRequestVoteRequest(
- peer.getId(), electionTerm, lastEntry);
+ final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
+ server.getMemberId(), peer.getId(), electionTerm, lastEntry);
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 5336420..d9aeab4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -25,22 +25,17 @@ import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.nio.file.Path;
import java.util.*;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
@@ -181,10 +176,6 @@ public class LogAppender {
return raftLog;
}
- public long getHalfMinTimeoutMs() {
- return halfMinTimeoutMs;
- }
-
@Override
public String toString() {
return name;
@@ -330,126 +321,24 @@ public class LogAppender {
}
}
- protected class SnapshotRequestIter
- implements Iterable<InstallSnapshotRequestProto> {
- private final SnapshotInfo snapshot;
- private final List<FileInfo> files;
- private FileInputStream in;
- private int fileIndex = 0;
-
- private FileInfo currentFileInfo;
- private byte[] currentBuf;
- private long currentFileSize;
- private long currentOffset = 0;
- private int chunkIndex = 0;
-
- private final String requestId;
- private int requestIndex = 0;
-
- public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
- throws IOException {
- this.snapshot = snapshot;
- this.requestId = requestId;
- this.files = snapshot.getFiles();
- if (files.size() > 0) {
- startReadFile();
- }
- }
-
- private void startReadFile() throws IOException {
- currentFileInfo = files.get(fileIndex);
- File snapshotFile = currentFileInfo.getPath().toFile();
- currentFileSize = snapshotFile.length();
- final int bufLength = getSnapshotChunkLength(currentFileSize);
- currentBuf = new byte[bufLength];
- currentOffset = 0;
- chunkIndex = 0;
- in = new FileInputStream(snapshotFile);
- }
-
- private int getSnapshotChunkLength(long len) {
- return len < snapshotChunkMaxSize? (int)len: snapshotChunkMaxSize;
- }
-
- @Override
- public Iterator<InstallSnapshotRequestProto> iterator() {
- return new Iterator<InstallSnapshotRequestProto>() {
- @Override
- public boolean hasNext() {
- return fileIndex < files.size();
- }
-
- @Override
- public InstallSnapshotRequestProto next() {
- if (fileIndex >= files.size()) {
- throw new NoSuchElementException();
- }
- final int targetLength = getSnapshotChunkLength(
- currentFileSize - currentOffset);
- FileChunkProto chunk;
- try {
- chunk = readFileChunk(currentFileInfo, in, currentBuf,
- targetLength, currentOffset, chunkIndex);
- boolean done = (fileIndex == files.size() - 1) &&
- chunk.getDone();
- InstallSnapshotRequestProto request =
- server.createInstallSnapshotRequest(follower.getPeer().getId(),
- requestId, requestIndex++, snapshot,
- Collections.singletonList(chunk), done);
- currentOffset += targetLength;
- chunkIndex++;
-
- if (currentOffset >= currentFileSize) {
- in.close();
- fileIndex++;
- if (fileIndex < files.size()) {
- startReadFile();
- }
- }
-
- return request;
- } catch (IOException e) {
- if (in != null) {
- try {
- in.close();
- } catch (IOException ignored) {
- }
- }
- LOG.warn("{}: Failed to prepare installSnapshot request", LogAppender.this, e);
- throw new RuntimeException(e);
- }
- }
- };
+ protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
+ Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
+ synchronized (server) {
+ return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), getFollowerId(),
+ server.getInfo().getCurrentTerm(), firstAvailableLogTermIndex, server.getRaftConf());
}
}
- protected InstallSnapshotRequestProto createInstallSnapshotNotificationRequest(
- TermIndex firstLogStartTermIndex) {
- return server.createInstallSnapshotRequest(getFollowerId(), firstLogStartTermIndex);
- }
-
- private FileChunkProto readFileChunk(FileInfo fileInfo,
- FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
- throws IOException {
- FileChunkProto.Builder builder = FileChunkProto.newBuilder()
- .setOffset(offset).setChunkIndex(chunkIndex);
- IOUtils.readFully(in, buf, 0, length);
- Path relativePath = server.getState().getStorage().getStorageDir()
- .relativizeToRoot(fileInfo.getPath());
- builder.setFilename(relativePath.toString());
- builder.setDone(offset + length == fileInfo.getFileSize());
- builder.setFileDigest(
- ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
- builder.setData(ByteString.copyFrom(buf, 0, length));
- return builder.build();
+ protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
+ String requestId, SnapshotInfo snapshot) {
+ return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
}
private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
String requestId = UUID.randomUUID().toString();
InstallSnapshotReplyProto reply = null;
try {
- for (InstallSnapshotRequestProto request :
- new SnapshotRequestIter(snapshot, requestId)) {
+ for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
follower.updateLastRpcSendTime();
reply = server.getServerRpc().installSnapshot(request);
follower.updateLastRpcResponseTime();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b90fe57..4a9bac6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -45,10 +45,9 @@ import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -251,6 +250,11 @@ class RaftServerImpl implements RaftServer.Division,
}
@Override
+ public RaftStorage getRaftStorage() {
+ return getState().getStorage();
+ }
+
+ @Override
public DataStreamMap getDataStreamMap() {
return dataStreamMap;
}
@@ -345,13 +349,9 @@ class RaftServerImpl implements RaftServer.Division,
return role;
}
- RaftConfiguration getRaftConf() {
- return getState().getRaftConf();
- }
-
@Override
- public RaftGroup getGroup() {
- return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
+ public RaftConfiguration getRaftConf() {
+ return getState().getRaftConf();
}
/**
@@ -1494,29 +1494,6 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
- RaftPeerId targetId, String requestId, int requestIndex,
- SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
- OptionalLong totalSize = snapshot.getFiles().stream()
- .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
- assert totalSize.isPresent();
- return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
- requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
- chunks, totalSize.getAsLong(), done, getRaftConf());
- }
-
- synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
- RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
- assert (firstAvailableLogTermIndex.getIndex() > 0);
- return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
- state.getCurrentTerm(), firstAvailableLogTermIndex, getRaftConf());
- }
-
- synchronized RequestVoteRequestProto createRequestVoteRequest(
- RaftPeerId targetId, long term, TermIndex lastEntry) {
- return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, term, lastEntry);
- }
-
void submitUpdateCommitEvent() {
role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
new file mode 100644
index 0000000..32b062f
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+
+/** Read {@link FileChunkProto}s from a file. */
+public class FileChunkReader implements Closeable {
+ private final FileInfo info;
+ private final Path relativePath;
+ private final FileInputStream in;
+ /** The offset position of the current chunk. */
+ private long offset = 0;
+ /** The index of the current chunk. */
+ private int chunkIndex = 0;
+
+ /**
+ * Construct a reader from a file specified by the given {@link FileInfo}.
+ *
+ * @param info the information of the file.
+ * @param directory the directory where the file is stored.
+ * @throws IOException if it failed to open the file.
+ */
+ public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOException {
+ this.info = info;
+ this.relativePath = directory.relativizeToRoot(info.getPath());
+ final File f = info.getPath().toFile();
+ this.in = new FileInputStream(f);
+ }
+
+ /**
+ * Read the next chunk.
+ *
+ * @param chunkMaxSize maximum chunk size
+ * @return the chunk read from the file.
+ * @throws IOException if it failed to read the file.
+ */
+ public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
+ final long remaining = info.getFileSize() - offset;
+ final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
+ final ByteString data = ByteString.readFrom(in, chunkLength);
+
+ final FileChunkProto proto = FileChunkProto.newBuilder()
+ .setFilename(relativePath.toString())
+ .setOffset(offset)
+ .setChunkIndex(chunkIndex)
+ .setDone(offset + chunkLength == info.getFileSize())
+ .setData(data)
+ .setFileDigest(ByteString.copyFrom(info.getFileDigest().getDigest()))
+ .build();
+ chunkIndex++;
+ offset += chunkLength;
+ return proto;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass())
+ + "{chunkIndex=" + chunkIndex
+ + ", offset=" + offset
+ + ", " + info + '}';
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
index f0aadd9..e8fb288 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,22 +31,23 @@ import org.apache.ratis.server.storage.FileInfo;
public interface SnapshotInfo {
/**
- * Returns the term and index corresponding to this snapshot.
* @return The term and index corresponding to this snapshot.
*/
TermIndex getTermIndex();
/**
- * Returns the term corresponding to this snapshot.
* @return The term corresponding to this snapshot.
*/
- long getTerm();
+ default long getTerm() {
+ return getTermIndex().getTerm();
+ }
/**
- * Returns the index corresponding to this snapshot.
* @return The index corresponding to this snapshot.
*/
- long getIndex();
+ default long getIndex() {
+ return getTermIndex().getIndex();
+ }
/**
* Returns a list of files corresponding to this snapshot. This list should include all
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
index 0e21223..265523b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,12 +17,14 @@
*/
package org.apache.ratis.statemachine.impl;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
/**
* Each snapshot has a list of files.
@@ -35,7 +37,7 @@ public class FileListSnapshotInfo implements SnapshotInfo {
public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
this.termIndex = TermIndex.newTermIndex(term, index);
- this.files = Collections.unmodifiableList(files);
+ this.files = Collections.unmodifiableList(new ArrayList<>(files));
}
@Override
@@ -44,22 +46,12 @@ public class FileListSnapshotInfo implements SnapshotInfo {
}
@Override
- public long getTerm() {
- return termIndex.getTerm();
- }
-
- @Override
- public long getIndex() {
- return termIndex.getIndex();
- }
-
- @Override
public List<FileInfo> getFiles() {
return files;
}
@Override
public String toString() {
- return termIndex + ":" + files;
+ return JavaUtils.getClassSimpleName(getClass()) + getTermIndex() + ":" + files;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
index 797db17..e51f26f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.statemachine.impl;
-import java.util.Arrays;
+import java.util.Collections;
import org.apache.ratis.server.storage.FileInfo;
@@ -28,7 +28,7 @@ import org.apache.ratis.server.storage.FileInfo;
*/
public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
- super(Arrays.asList(fileInfo), term, endIndex);
+ super(Collections.singletonList(fileInfo), term, endIndex);
}
/** @return the file associated with the snapshot. */
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b91fb77..8d94b9b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -53,9 +53,11 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.util.CollectionUtils;
@@ -96,7 +98,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
- public RaftGroup getGroup() {
+ public RaftConfiguration getRaftConf() {
return null;
}
@@ -116,6 +118,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RaftStorage getRaftStorage() {
+ return null;
+ }
+
+ @Override
public DataStreamMap getDataStreamMap() {
return streamMap;
}