You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/09/23 21:00:20 UTC
[incubator-ratis] branch master updated: RATIS-677. Log entry
marked corrupt due to ChecksumException.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c564743 RATIS-677. Log entry marked corrupt due to ChecksumException.
c564743 is described below
commit c564743fff6271b6d3a827997b72f3bb5be64b04
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Sep 23 13:57:40 2019 -0700
RATIS-677. Log entry marked corrupt due to ChecksumException.
---
.../apache/ratis/server/RaftServerConfigKeys.java | 28 +++++++++
.../org/apache/ratis/server/impl/ServerState.java | 3 +-
.../ratis/server/raftlog/segmented/LogSegment.java | 38 ++++++++----
.../raftlog/segmented/SegmentedRaftLogReader.java | 6 +-
.../apache/ratis/server/storage/RaftStorage.java | 14 ++++-
.../apache/ratis/server/ServerRestartTests.java | 70 ++++++++++++++++++++++
.../raftlog/segmented/TestSegmentedRaftLog.java | 4 ++
.../ratis/server/storage/TestRaftStorage.java | 7 +--
8 files changed, 149 insertions(+), 21 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 405d684..c5e9504 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -26,8 +26,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.function.Function;
import static org.apache.ratis.conf.ConfUtils.*;
@@ -216,6 +218,32 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
}
+ /** The policy to handle corrupted raft log. */
+ enum CorruptionPolicy {
+ /** Rethrow the exception. */
+ EXCEPTION,
+ /** Print a warn log message and return all uncorrupted log entries up to the corruption. */
+ WARN_AND_RETURN;
+
+ public static CorruptionPolicy getDefault() {
+ return EXCEPTION;
+ }
+
+ public static <T> CorruptionPolicy get(T supplier, Function<T, CorruptionPolicy> getMethod) {
+ return Optional.ofNullable(supplier).map(getMethod).orElse(getDefault());
+ }
+ }
+
+ String CORRUPTION_POLICY_KEY = PREFIX + ".corruption.policy";
+ CorruptionPolicy CORRUPTION_POLICY_DEFAULT = CorruptionPolicy.getDefault();
+ static CorruptionPolicy corruptionPolicy(RaftProperties properties) {
+ return get(properties::getEnum,
+ CORRUPTION_POLICY_KEY, CORRUPTION_POLICY_DEFAULT, getDefaultLog());
+ }
+ static void setCorruptionPolicy(RaftProperties properties, CorruptionPolicy corruptionPolicy) {
+ set(properties::setEnum, CORRUPTION_POLICY_KEY, corruptionPolicy);
+ }
+
interface StateMachineData {
String PREFIX = Log.PREFIX + ".statemachine.data";
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6805ae4..82993b5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -105,7 +105,8 @@ public class ServerState implements Closeable {
// use full uuid string to create a subdirectory
final File dir = chooseStorageDir(RaftServerConfigKeys.storageDirs(prop),
group.getGroupId().getUuid().toString());
- storage = new RaftStorage(dir, RaftServerConstants.StartupOption.REGULAR);
+ storage = new RaftStorage(dir, RaftServerConstants.StartupOption.REGULAR,
+ RaftServerConfigKeys.Log.corruptionPolicy(prop));
snapshotManager = new SnapshotManager(storage, id);
long lastApplied = initStatemachine(stateMachine, group.getGroupId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2ffc907..12bd195 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -81,14 +82,14 @@ class LogSegment implements Comparable<Long> {
}
@VisibleForTesting
- static LogSegment newCloseSegment(RaftStorage storage,
- long start, long end) {
+ static LogSegment newCloseSegment(RaftStorage storage, long start, long end) {
Preconditions.assertTrue(start >= 0 && end >= start);
return new LogSegment(storage, false, start, end);
}
- private static int readSegmentFile(File file, long start, long end,
- boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException {
+ private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+ CorruptionPolicy corruptionPolicy, Consumer<LogEntryProto> entryConsumer)
+ throws IOException {
int count = 0;
try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
@@ -102,19 +103,31 @@ class LogSegment implements Comparable<Long> {
}
count++;
}
+ } catch (IOException ioe) {
+ switch (corruptionPolicy) {
+ case EXCEPTION: throw ioe;
+ case WARN_AND_RETURN:
+ LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully",
+ file, start, end, isOpen, count, ioe);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy
+ + ", class=" + CorruptionPolicy.class);
+ }
}
+
return count;
}
- static LogSegment loadSegment(RaftStorage storage, File file,
- long start, long end, boolean isOpen,
+ static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen,
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
throws IOException {
final LogSegment segment = isOpen ?
LogSegment.newOpenSegment(storage, start) :
LogSegment.newCloseSegment(storage, start, end);
- final int entryCount = readSegmentFile(file, start, end, isOpen, entry -> {
+ final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
+ final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, entry -> {
segment.append(keepEntryInCache || isOpen, entry);
if (logConsumer != null) {
logConsumer.accept(entry);
@@ -154,7 +167,7 @@ class LogSegment implements Comparable<Long> {
final File file = getSegmentFile();
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
- readSegmentFile(file, startIndex, endIndex, isOpen,
+ readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(),
entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
loadingTimes.incrementAndGet();
return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
@@ -168,11 +181,11 @@ class LogSegment implements Comparable<Long> {
}
private volatile boolean isOpen;
- private long totalSize;
+ private long totalSize = SegmentedRaftLogFormat.getHeaderLength();
private final long startIndex;
private volatile long endIndex;
private final RaftStorage storage;
- private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
+ private final LogEntryLoader cacheLoader = new LogEntryLoader();
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
@@ -191,7 +204,6 @@ class LogSegment implements Comparable<Long> {
this.isOpen = isOpen;
this.startIndex = start;
this.endIndex = end;
- totalSize = SegmentedRaftLogFormat.getHeaderLength();
}
long getStartIndex() {
@@ -210,6 +222,10 @@ class LogSegment implements Comparable<Long> {
return Math.toIntExact(endIndex - startIndex + 1);
}
+ CorruptionPolicy getLogCorruptionPolicy() {
+ return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
+ }
+
void appendToOpenSegment(LogEntryProto entry) {
Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this);
append(true, entry);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index f6203c0..f90d4c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -294,9 +294,9 @@ class SegmentedRaftLogReader implements Closeable {
int expectedChecksum = in.readInt();
int calculatedChecksum = (int) checksum.getValue();
if (expectedChecksum != calculatedChecksum) {
- throw new ChecksumException("LogEntry is corrupt. Calculated checksum is "
- + calculatedChecksum + " but read checksum " + expectedChecksum,
- limiter.markPos);
+ final String s = StringUtils.format("Log entry corrupted: Calculated checksum is %08X but read checksum is %08X.",
+ calculatedChecksum, expectedChecksum);
+ throw new ChecksumException(s, limiter.markPos);
}
// parse the buffer
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 81a9de2..0d407d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.storage;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -39,11 +40,17 @@ public class RaftStorage implements Closeable {
// TODO support multiple storage directories
private final RaftStorageDirectory storageDir;
private final StorageState state;
+ private final CorruptionPolicy logCorruptionPolicy;
private volatile MetaFile metaFile;
public RaftStorage(File dir, RaftServerConstants.StartupOption option)
throws IOException {
- storageDir = new RaftStorageDirectory(dir);
+ this(dir, option, CorruptionPolicy.getDefault());
+ }
+
+ public RaftStorage(File dir, RaftServerConstants.StartupOption option, CorruptionPolicy logCorruptionPolicy)
+ throws IOException {
+ this.storageDir = new RaftStorageDirectory(dir);
if (option == RaftServerConstants.StartupOption.FORMAT) {
if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
throw new IOException("Cannot format " + storageDir);
@@ -60,12 +67,17 @@ public class RaftStorage implements Closeable {
+ ". Its state: " + state);
}
}
+ this.logCorruptionPolicy = logCorruptionPolicy;
}
StorageState getState() {
return state;
}
+ public CorruptionPolicy getLogCorruptionPolicy() {
+ return logCorruptionPolicy;
+ }
+
private void format() throws IOException {
storageDir.clearDirectory();
metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
similarity index 83%
rename from ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
rename to ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 6bbd708..32d1217 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -25,9 +25,11 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ChecksumException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -36,6 +38,8 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogFormat;
+import org.apache.ratis.server.RaftServerConfigKeys.Log;
+import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -57,6 +61,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -312,4 +317,69 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
Assert.assertEquals(lastCommittedEntry.getTerm(), lastAppliedTermIndex.getTerm());
Assert.assertTrue(lastCommittedEntry.getIndex() <= lastAppliedTermIndex.getIndex());
}
+
+ @Test
+ public void testRestartWithCorruptedLogEntryWithWarnAndReturn() throws Exception {
+ final RaftProperties p = getProperties();
+ final Log.CorruptionPolicy policy = Log.corruptionPolicy(p);
+ Log.setCorruptionPolicy(p, Log.CorruptionPolicy.WARN_AND_RETURN);
+
+ runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry);
+
+ Log.setCorruptionPolicy(p, policy);
+ }
+
+ @Test
+ public void testRestartWithCorruptedLogEntryWithException() throws Exception {
+ final RaftProperties p = getProperties();
+ final Log.CorruptionPolicy policy = Log.corruptionPolicy(p);
+ Log.setCorruptionPolicy(p, Log.CorruptionPolicy.EXCEPTION);
+
+ testFailureCase("restart-fail-ChecksumException",
+ () -> runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry),
+ CompletionException.class, ChecksumException.class);
+
+ Log.setCorruptionPolicy(p, policy);
+ }
+
+ private void runTestRestartWithCorruptedLogEntry(CLUSTER cluster) throws Exception {
+ // this is the only server
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId id = leader.getId();
+
+ // send a few messages
+ final SimpleMessage[] messages = SimpleMessage.create(10);
+ final SimpleMessage lastMessage = messages[messages.length - 1];
+ try (final RaftClient client = cluster.createClient()) {
+ for (SimpleMessage m : messages) {
+ Assert.assertTrue(client.send(m).isSuccess());
+ }
+
+ // assert that the last message exists
+ Assert.assertTrue(client.sendReadOnly(lastMessage).isSuccess());
+ }
+
+ final RaftLog log = leader.getState().getLog();
+ final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
+ leader.getProxy().close();
+
+ // corrupt the log
+ final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(leader),
+ 10, HUNDRED_MILLIS, id + "-getOpenLogFile", LOG);
+ try(final RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw")) {
+ final long mid = size / 2;
+ raf.seek(mid);
+ for (long i = mid; i < size; i++) {
+ raf.write(0);
+ }
+ }
+
+ // after the log is corrupted and the server is restarted, the last entry should no longer exist.
+ cluster.restartServer(id, false);
+ testFailureCase("last-entry-not-found", () -> {
+ try (final RaftClient client = cluster.createClient()) {
+ client.sendReadOnly(lastMessage);
+ }
+ }, StateMachineException.class, IndexOutOfBoundsException.class);
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b03102c..fa89192 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -74,6 +74,10 @@ public class TestSegmentedRaftLog extends BaseTest {
LogUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
}
+ public static long getOpenSegmentSize(RaftLog raftLog) {
+ return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalSize();
+ }
+
private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
private static final RaftGroupId groupId = RaftGroupId.randomId();
private static final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index da17dd2..86c8cb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.ratis.server.storage;
import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
@@ -36,7 +34,6 @@ import org.mockito.internal.util.reflection.Whitebox;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -50,7 +47,7 @@ public class TestRaftStorage extends BaseTest {
private File storageDir;
@Before
- public void setup() throws Exception {
+ public void setup() {
storageDir = getTestDir();
}