You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/09/23 21:00:20 UTC

[incubator-ratis] branch master updated: RATIS-677. Log entry marked corrupt due to ChecksumException.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c564743  RATIS-677. Log entry marked corrupt due to ChecksumException.
c564743 is described below

commit c564743fff6271b6d3a827997b72f3bb5be64b04
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Sep 23 13:57:40 2019 -0700

    RATIS-677. Log entry marked corrupt due to ChecksumException.
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 28 +++++++++
 .../org/apache/ratis/server/impl/ServerState.java  |  3 +-
 .../ratis/server/raftlog/segmented/LogSegment.java | 38 ++++++++----
 .../raftlog/segmented/SegmentedRaftLogReader.java  |  6 +-
 .../apache/ratis/server/storage/RaftStorage.java   | 14 ++++-
 .../apache/ratis/server/ServerRestartTests.java    | 70 ++++++++++++++++++++++
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  4 ++
 .../ratis/server/storage/TestRaftStorage.java      |  7 +--
 8 files changed, 149 insertions(+), 21 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 405d684..c5e9504 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -26,8 +26,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.apache.ratis.conf.ConfUtils.*;
 
@@ -216,6 +218,32 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
     }
 
+    /** The policy to handle corrupted raft log. */
+    enum CorruptionPolicy {
+      /** Rethrow the exception. */
+      EXCEPTION,
+      /** Print a warn log message and return all uncorrupted log entries up to the corruption. */
+      WARN_AND_RETURN;
+
+      public static CorruptionPolicy getDefault() {
+        return EXCEPTION;
+      }
+
+      public static <T> CorruptionPolicy get(T supplier, Function<T, CorruptionPolicy> getMethod) {
+        return Optional.ofNullable(supplier).map(getMethod).orElse(getDefault());
+      }
+    }
+
+    String CORRUPTION_POLICY_KEY = PREFIX + ".corruption.policy";
+    CorruptionPolicy CORRUPTION_POLICY_DEFAULT = CorruptionPolicy.getDefault();
+    static CorruptionPolicy corruptionPolicy(RaftProperties properties) {
+      return get(properties::getEnum,
+          CORRUPTION_POLICY_KEY, CORRUPTION_POLICY_DEFAULT, getDefaultLog());
+    }
+    static void setCorruptionPolicy(RaftProperties properties, CorruptionPolicy corruptionPolicy) {
+      set(properties::setEnum, CORRUPTION_POLICY_KEY, corruptionPolicy);
+    }
+
     interface StateMachineData {
       String PREFIX = Log.PREFIX + ".statemachine.data";
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6805ae4..82993b5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -105,7 +105,8 @@ public class ServerState implements Closeable {
     // use full uuid string to create a subdirectory
     final File dir = chooseStorageDir(RaftServerConfigKeys.storageDirs(prop),
         group.getGroupId().getUuid().toString());
-    storage = new RaftStorage(dir, RaftServerConstants.StartupOption.REGULAR);
+    storage = new RaftStorage(dir, RaftServerConstants.StartupOption.REGULAR,
+        RaftServerConfigKeys.Log.corruptionPolicy(prop));
     snapshotManager = new SnapshotManager(storage, id);
 
     long lastApplied = initStatemachine(stateMachine, group.getGroupId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2ffc907..12bd195 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -81,14 +82,14 @@ class LogSegment implements Comparable<Long> {
   }
 
   @VisibleForTesting
-  static LogSegment newCloseSegment(RaftStorage storage,
-      long start, long end) {
+  static LogSegment newCloseSegment(RaftStorage storage, long start, long end) {
     Preconditions.assertTrue(start >= 0 && end >= start);
     return new LogSegment(storage, false, start, end);
   }
 
-  private static int readSegmentFile(File file, long start, long end,
-      boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException {
+  private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+      CorruptionPolicy corruptionPolicy, Consumer<LogEntryProto> entryConsumer)
+      throws IOException {
     int count = 0;
     try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen)) {
       for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
@@ -102,19 +103,31 @@ class LogSegment implements Comparable<Long> {
         }
         count++;
       }
+    } catch (IOException ioe) {
+      switch (corruptionPolicy) {
+        case EXCEPTION: throw ioe;
+        case WARN_AND_RETURN:
+          LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully",
+              file, start, end, isOpen, count, ioe);
+          break;
+        default:
+          throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy
+              + ", class=" + CorruptionPolicy.class);
+      }
     }
+
     return count;
   }
 
-  static LogSegment loadSegment(RaftStorage storage, File file,
-      long start, long end, boolean isOpen,
+  static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen,
       boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
       throws IOException {
     final LogSegment segment = isOpen ?
         LogSegment.newOpenSegment(storage, start) :
         LogSegment.newCloseSegment(storage, start, end);
 
-    final int entryCount = readSegmentFile(file, start, end, isOpen, entry -> {
+    final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
+    final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, entry -> {
       segment.append(keepEntryInCache || isOpen, entry);
       if (logConsumer != null) {
         logConsumer.accept(entry);
@@ -154,7 +167,7 @@ class LogSegment implements Comparable<Long> {
       final File file = getSegmentFile();
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
-      readSegmentFile(file, startIndex, endIndex, isOpen,
+      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(),
           entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
@@ -168,11 +181,11 @@ class LogSegment implements Comparable<Long> {
   }
 
   private volatile boolean isOpen;
-  private long totalSize;
+  private long totalSize = SegmentedRaftLogFormat.getHeaderLength();
   private final long startIndex;
   private volatile long endIndex;
   private final RaftStorage storage;
-  private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
+  private final LogEntryLoader cacheLoader = new LogEntryLoader();
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
 
@@ -191,7 +204,6 @@ class LogSegment implements Comparable<Long> {
     this.isOpen = isOpen;
     this.startIndex = start;
     this.endIndex = end;
-    totalSize = SegmentedRaftLogFormat.getHeaderLength();
   }
 
   long getStartIndex() {
@@ -210,6 +222,10 @@ class LogSegment implements Comparable<Long> {
     return Math.toIntExact(endIndex - startIndex + 1);
   }
 
+  CorruptionPolicy getLogCorruptionPolicy() {
+    return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
+  }
+
   void appendToOpenSegment(LogEntryProto entry) {
     Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this);
     append(true, entry);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index f6203c0..f90d4c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -294,9 +294,9 @@ class SegmentedRaftLogReader implements Closeable {
     int expectedChecksum = in.readInt();
     int calculatedChecksum = (int) checksum.getValue();
     if (expectedChecksum != calculatedChecksum) {
-      throw new ChecksumException("LogEntry is corrupt. Calculated checksum is "
-          + calculatedChecksum + " but read checksum " + expectedChecksum,
-          limiter.markPos);
+      final String s = StringUtils.format("Log entry corrupted: Calculated checksum is %08X but read checksum is %08X.",
+          calculatedChecksum, expectedChecksum);
+      throw new ChecksumException(s, limiter.markPos);
     }
 
     // parse the buffer
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 81a9de2..0d407d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.storage;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -39,11 +40,17 @@ public class RaftStorage implements Closeable {
   // TODO support multiple storage directories
   private final RaftStorageDirectory storageDir;
   private final StorageState state;
+  private final CorruptionPolicy logCorruptionPolicy;
   private volatile MetaFile metaFile;
 
   public RaftStorage(File dir, RaftServerConstants.StartupOption option)
       throws IOException {
-    storageDir = new RaftStorageDirectory(dir);
+    this(dir, option, CorruptionPolicy.getDefault());
+  }
+
+  public RaftStorage(File dir, RaftServerConstants.StartupOption option, CorruptionPolicy logCorruptionPolicy)
+      throws IOException {
+    this.storageDir = new RaftStorageDirectory(dir);
     if (option == RaftServerConstants.StartupOption.FORMAT) {
       if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
         throw new IOException("Cannot format " + storageDir);
@@ -60,12 +67,17 @@ public class RaftStorage implements Closeable {
             + ". Its state: " + state);
       }
     }
+    this.logCorruptionPolicy = logCorruptionPolicy;
   }
 
   StorageState getState() {
     return state;
   }
 
+  public CorruptionPolicy getLogCorruptionPolicy() {
+    return logCorruptionPolicy;
+  }
+
   private void format() throws IOException {
     storageDir.clearDirectory();
     metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
similarity index 83%
rename from ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
rename to ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 6bbd708..32d1217 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -25,9 +25,11 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ChecksumException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -36,6 +38,8 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogFormat;
+import org.apache.ratis.server.RaftServerConfigKeys.Log;
+import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
@@ -57,6 +61,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -312,4 +317,69 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
     Assert.assertEquals(lastCommittedEntry.getTerm(), lastAppliedTermIndex.getTerm());
     Assert.assertTrue(lastCommittedEntry.getIndex() <= lastAppliedTermIndex.getIndex());
   }
+
+  @Test
+  public void testRestartWithCorruptedLogEntryWithWarnAndReturn() throws Exception {
+    final RaftProperties p = getProperties();
+    final Log.CorruptionPolicy policy = Log.corruptionPolicy(p);
+    Log.setCorruptionPolicy(p, Log.CorruptionPolicy.WARN_AND_RETURN);
+
+    runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry);
+
+    Log.setCorruptionPolicy(p, policy);
+  }
+
+  @Test
+  public void testRestartWithCorruptedLogEntryWithException() throws Exception {
+    final RaftProperties p = getProperties();
+    final Log.CorruptionPolicy policy = Log.corruptionPolicy(p);
+    Log.setCorruptionPolicy(p, Log.CorruptionPolicy.EXCEPTION);
+
+    testFailureCase("restart-fail-ChecksumException",
+        () -> runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry),
+        CompletionException.class, ChecksumException.class);
+
+    Log.setCorruptionPolicy(p, policy);
+  }
+
+  private void runTestRestartWithCorruptedLogEntry(CLUSTER cluster) throws Exception {
+    // this is the only server
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId id = leader.getId();
+
+    // send a few messages
+    final SimpleMessage[] messages = SimpleMessage.create(10);
+    final SimpleMessage lastMessage = messages[messages.length - 1];
+    try (final RaftClient client = cluster.createClient()) {
+      for (SimpleMessage m : messages) {
+        Assert.assertTrue(client.send(m).isSuccess());
+      }
+
+      // assert that the last message exists
+      Assert.assertTrue(client.sendReadOnly(lastMessage).isSuccess());
+    }
+
+    final RaftLog log = leader.getState().getLog();
+    final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
+    leader.getProxy().close();
+
+    // corrupt the log
+    final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(leader),
+        10, HUNDRED_MILLIS, id + "-getOpenLogFile", LOG);
+    try(final RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw")) {
+      final long mid = size / 2;
+      raf.seek(mid);
+      for (long i = mid; i < size; i++) {
+        raf.write(0);
+      }
+    }
+
+    // after the log is corrupted and the server is restarted, the last entry should no longer exist.
+    cluster.restartServer(id, false);
+    testFailureCase("last-entry-not-found", () -> {
+      try (final RaftClient client = cluster.createClient()) {
+        client.sendReadOnly(lastMessage);
+      }
+    }, StateMachineException.class, IndexOutOfBoundsException.class);
+  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b03102c..fa89192 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -74,6 +74,10 @@ public class TestSegmentedRaftLog extends BaseTest {
     LogUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
   }
 
+  public static long getOpenSegmentSize(RaftLog raftLog) {
+    return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalSize();
+  }
+
   private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
   private static final RaftGroupId groupId = RaftGroupId.randomId();
   private static final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index da17dd2..86c8cb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.ratis.server.storage;
 import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
 
 import org.apache.ratis.BaseTest;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
@@ -36,7 +34,6 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -50,7 +47,7 @@ public class TestRaftStorage extends BaseTest {
   private File storageDir;
 
   @Before
-  public void setup() throws Exception {
+  public void setup() {
     storageDir = getTestDir();
   }