You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/10/24 02:57:54 UTC
[ratis] branch master updated: RATIS-1723. CounterStateMachine should update the latest snapshot. (#765)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b8117f414 RATIS-1723. CounterStateMachine should update the latest snapshot. (#765)
b8117f414 is described below
commit b8117f41481f5f5861566a68bb1ea4dc7c60320c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Oct 24 10:57:48 2022 +0800
RATIS-1723. CounterStateMachine should update the latest snapshot. (#765)
* RATIS-1723. CounterStateMachine should update the latest snapshot.
* Address review comments and fix bugs.
* delete md5 file
---
.../main/java/org/apache/ratis/util/FileUtils.java | 6 ++--
.../java/org/apache/ratis/util/MD5FileUtil.java | 28 ++++++++++-----
.../arithmetic/ArithmeticStateMachine.java | 22 +++++++-----
.../counter/server/CounterStateMachine.java | 19 ++++++++--
.../statemachine/impl/FileListSnapshotInfo.java | 10 ++++--
.../impl/SimpleStateMachineStorage.java | 37 ++++++++++++-------
.../statemachine/impl/SingleFileSnapshotInfo.java | 9 +++--
.../impl/SimpleStateMachine4Testing.java | 41 +++++++---------------
8 files changed, 105 insertions(+), 67 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 40a51e9f9..f978a79c3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -144,11 +144,13 @@ public interface FileUtils {
}
/** The same as passing f.toPath() to {@link #delete(Path)}. */
- static void deleteFileQuietly(File f) {
+ static boolean deleteFileQuietly(File f) {
try {
delete(f.toPath());
+ return true;
} catch (Exception ex) {
- LOG.debug("File delete was not susccesful {}", f.getAbsoluteFile(), ex);
+ LOG.debug("File delete was not successful {}", f.getAbsoluteFile(), ex);
+ return false;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
index d7201caa9..2e5eb2984 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,9 +26,9 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.regex.Matcher;
@@ -122,16 +122,26 @@ public abstract class MD5FileUtil {
* Read dataFile and compute its MD5 checksum.
*/
public static MD5Hash computeMd5ForFile(File dataFile) throws IOException {
- InputStream in = new FileInputStream(dataFile);
- try {
- MessageDigest digester = MD5Hash.getDigester();
- DigestInputStream dis = new DigestInputStream(in, digester);
+ final MessageDigest digester = MD5Hash.getDigester();
+ try (DigestInputStream dis = new DigestInputStream(Files.newInputStream(dataFile.toPath()), digester)) {
IOUtils.readFully(dis, 128*1024);
+ }
+ return new MD5Hash(digester.digest());
+ }
- return new MD5Hash(digester.digest());
- } finally {
- IOUtils.cleanup(LOG, in);
+ public static MD5Hash computeAndSaveMd5ForFile(File dataFile) {
+ final MD5Hash md5;
+ try {
+ md5 = computeMd5ForFile(dataFile);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to compute MD5 for file " + dataFile, e);
+ }
+ try {
+ saveMD5File(dataFile, md5);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to save MD5 " + md5 + " for file " + dataFile, e);
}
+ return md5;
}
/**
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 4b11fa29b..ac7b199fc 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.ratis.examples.arithmetic;
import org.apache.ratis.examples.arithmetic.expression.Expression;
+import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.Message;
@@ -25,6 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
@@ -33,6 +35,7 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MD5FileUtil;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -102,14 +105,13 @@ public class ArithmeticStateMachine extends BaseStateMachine {
+ "\", last applied index=" + last);
}
+ final MD5Hash md5 = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
+ final FileInfo info = new FileInfo(snapshotFile.toPath(), md5);
+ storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, last));
return last.getIndex();
}
public long loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException {
- return load(snapshot, false);
- }
-
- private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws IOException {
if (snapshot == null) {
LOG.warn("The snapshot info is null.");
return RaftLog.INVALID_LOG_INDEX;
@@ -120,17 +122,21 @@ public class ArithmeticStateMachine extends BaseStateMachine {
return RaftLog.INVALID_LOG_INDEX;
}
+ // verify md5
+ final MD5Hash md5 = snapshot.getFile().getFileDigest();
+ if (md5 != null) {
+ MD5FileUtil.verifySavedMD5(snapshotFile, md5);
+ }
+
final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
try(AutoCloseableLock writeLock = writeLock();
ObjectInputStream in = new ObjectInputStream(
new BufferedInputStream(new FileInputStream(snapshotFile)))) {
- if (reload) {
- reset();
- }
+ reset();
setLastAppliedTermIndex(last);
variables.putAll(JavaUtils.cast(in.readObject()));
} catch (ClassNotFoundException e) {
- throw new IllegalStateException(e);
+ throw new IllegalStateException("Failed to load " + snapshot, e);
}
return last.getIndex();
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index d5a027910..f383d3a83 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.ratis.examples.counter.server;
import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.Message;
@@ -25,12 +26,14 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MD5FileUtil;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -46,10 +49,10 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* A {@link org.apache.ratis.statemachine.StateMachine} implementation for the {@link CounterServer}.
* This class maintain a {@link AtomicInteger} object as a state and accept two commands:
- *
+ * <p>
* - {@link CounterCommand#GET} is a readonly command
* which is handled by the {@link #query(Message)} method.
- *
+ * <p>
* - {@link CounterCommand#INCREMENT} is a transactional command
* which is handled by the {@link #applyTransaction(TransactionContext)} method.
*/
@@ -139,6 +142,11 @@ public class CounterStateMachine extends BaseStateMachine {
+ "\", last applied index=" + state.getApplied());
}
+ // update storage
+ final MD5Hash md5 = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
+ final FileInfo info = new FileInfo(snapshotFile.toPath(), md5);
+ storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, state.getApplied()));
+
//return the index of the stored snapshot (which is the last applied one)
return index;
}
@@ -153,7 +161,6 @@ public class CounterStateMachine extends BaseStateMachine {
private long load(SingleFileSnapshotInfo snapshot) throws IOException {
//check null
if (snapshot == null) {
- LOG.warn("The snapshot info is null.");
return RaftLog.INVALID_LOG_INDEX;
}
//check if the snapshot file exists.
@@ -163,6 +170,12 @@ public class CounterStateMachine extends BaseStateMachine {
return RaftLog.INVALID_LOG_INDEX;
}
+ // verify md5
+ final MD5Hash md5 = snapshot.getFile().getFileDigest();
+ if (md5 != null) {
+ MD5FileUtil.verifySavedMD5(snapshotPath.toFile(), md5);
+ }
+
//read the TermIndex from the snapshot file name
final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
index 540c5bf6e..afc43852a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/FileListSnapshotInfo.java
@@ -28,18 +28,22 @@ import org.apache.ratis.util.JavaUtils;
/**
* Each snapshot has a list of files.
- *
+ * <p>
* The objects of this class are immutable.
*/
public class FileListSnapshotInfo implements SnapshotInfo {
private final TermIndex termIndex;
private final List<FileInfo> files;
- public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
- this.termIndex = TermIndex.valueOf(term, index);
+ public FileListSnapshotInfo(List<FileInfo> files, TermIndex termIndex) {
+ this.termIndex = termIndex;
this.files = Collections.unmodifiableList(new ArrayList<>(files));
}
+ public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
+ this(files, TermIndex.valueOf(term, index));
+ }
+
@Override
public TermIndex getTermIndex() {
return termIndex;
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 1be9cb281..0a6ce2936 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -21,7 +21,6 @@ import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -42,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -60,13 +60,12 @@ public class SimpleStateMachineStorage implements StateMachineStorage {
Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
private volatile File stateMachineDir = null;
-
- private volatile SingleFileSnapshotInfo currentSnapshot = null;
+ private final AtomicReference<SingleFileSnapshotInfo> latestSnapshot = new AtomicReference<>();
@Override
public void init(RaftStorage storage) throws IOException {
this.stateMachineDir = storage.getStorageDir().getStateMachineDir();
- loadLatestSnapshot();
+ getLatestSnapshot();
}
@Override
@@ -109,14 +108,18 @@ public class SimpleStateMachineStorage implements StateMachineStorage {
final List<SingleFileSnapshotInfo> allSnapshotFiles = getSingleFileSnapshotInfos(stateMachineDir.toPath());
if (allSnapshotFiles.size() > snapshotRetentionPolicy.getNumSnapshotsRetained()) {
- allSnapshotFiles.sort(Comparator.comparing(SnapshotInfo::getIndex).reversed());
+ allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed());
List<File> snapshotFilesToBeCleaned = allSnapshotFiles.subList(
snapshotRetentionPolicy.getNumSnapshotsRetained(), allSnapshotFiles.size()).stream()
.map(singleFileSnapshotInfo -> singleFileSnapshotInfo.getFile().getPath().toFile())
.collect(Collectors.toList());
for (File snapshotFile : snapshotFilesToBeCleaned) {
LOG.info("Deleting old snapshot at {}", snapshotFile.getAbsolutePath());
- FileUtils.deleteFileQuietly(snapshotFile);
+ final boolean deleted = FileUtils.deleteFileQuietly(snapshotFile);
+ if (deleted) {
+ final File md5file = MD5FileUtil.getDigestFileForFile(snapshotFile);
+ FileUtils.deleteFileQuietly(md5file);
+ }
}
}
}
@@ -178,11 +181,9 @@ public class SimpleStateMachineStorage implements StateMachineStorage {
return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex());
}
- public void loadLatestSnapshot() throws IOException {
- if (stateMachineDir == null) {
- return;
- }
- this.currentSnapshot = findLatestSnapshot(stateMachineDir.toPath());
+ public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) {
+ return latestSnapshot.updateAndGet(
+ previous -> previous == null || info.getIndex() > previous.getIndex()? info: previous);
}
public static String getSnapshotFileName(long term, long endIndex) {
@@ -191,7 +192,19 @@ public class SimpleStateMachineStorage implements StateMachineStorage {
@Override
public SingleFileSnapshotInfo getLatestSnapshot() {
- return currentSnapshot;
+ final SingleFileSnapshotInfo s = latestSnapshot.get();
+ if (s != null) {
+ return s;
+ }
+ final File dir = stateMachineDir;
+ if (dir == null) {
+ return null;
+ }
+ try {
+ return updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
+ } catch (IOException ignored) {
+ return null;
+ }
}
@VisibleForTesting
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
index e51f26f14..14d501a4a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java
@@ -19,16 +19,21 @@ package org.apache.ratis.statemachine.impl;
import java.util.Collections;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
/**
* Each snapshot only has a single file.
- *
+ * <p>
* The objects of this class are immutable.
*/
public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
+ public SingleFileSnapshotInfo(FileInfo fileInfo, TermIndex termIndex) {
+ super(Collections.singletonList(fileInfo), termIndex);
+ }
+
public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
- super(Collections.singletonList(fileInfo), term, endIndex);
+ this(fileInfo, TermIndex.valueOf(term, endIndex));
}
/** @return the file associated with the snapshot. */
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 141c221ba..800ab02ea 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -36,7 +36,9 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogInputStream;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
+import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -232,7 +234,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
@Override
public synchronized void reinitialize() throws IOException {
LOG.info("Reinitializing " + this);
- loadSnapshot(storage.findLatestSnapshot(getStateMachineDir().toPath()));
+ loadSnapshot(storage.getLatestSnapshot());
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
getLifeCycle().transition(LifeCycle.State.STARTING);
getLifeCycle().transition(LifeCycle.State.RUNNING);
@@ -258,10 +260,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
final long endIndex = termIndex.getIndex();
// TODO: snapshot should be written to a tmp file, then renamed
- File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(),
- termIndex.getIndex());
- LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
- termIndex.getIndex(), snapshotFile);
+ final File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(), endIndex);
+ LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (final LogEntryProto entry : indexMap.values()) {
@@ -276,20 +276,10 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
LOG.warn("Failed to take snapshot", e);
}
- try {
- final MD5Hash digest = MD5FileUtil.computeMd5ForFile(snapshotFile);
- MD5FileUtil.saveMD5File(snapshotFile, digest);
- } catch (IOException e) {
- LOG.warn("Hit IOException when computing MD5 for snapshot file "
- + snapshotFile, e);
- }
+ final MD5Hash digest = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
+ final FileInfo info = new FileInfo(snapshotFile.toPath(), digest);
+ storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, termIndex));
- try {
- this.storage.loadLatestSnapshot();
- } catch (IOException e) {
- LOG.warn("Hit IOException when loading latest snapshot for snapshot file "
- + snapshotFile, e);
- }
// TODO: purge log segments
return endIndex;
}
@@ -299,14 +289,11 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
return storage;
}
- private synchronized long loadSnapshot(SingleFileSnapshotInfo snapshot)
- throws IOException {
- if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) {
- LOG.info("The snapshot file {} does not exist",
- snapshot == null ? null : snapshot.getFile());
- return RaftLog.INVALID_LOG_INDEX;
- } else {
+ private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException {
LOG.info("Loading snapshot {}", snapshot);
+ if (snapshot == null) {
+ return;
+ }
final long endIndex = snapshot.getIndex();
try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
@@ -323,9 +310,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
"endIndex=%s, indexMap=%s", endIndex, indexMap);
this.endIndexLastCkpt = endIndex;
setLastAppliedTermIndex(snapshot.getTermIndex());
- this.storage.loadLatestSnapshot();
- return endIndex;
- }
+ this.storage.updateLatestSnapshot(snapshot);
}
/**