You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/21 05:29:27 UTC

[incubator-ratis] branch master updated: RATIS-1255. Move RaftLog to raft-server-api. (#367)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c714aff  RATIS-1255. Move RaftLog to raft-server-api. (#367)
c714aff is described below

commit c714affca4da0637b877a53bc21e21f3583204ae
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 21 13:29:17 2020 +0800

    RATIS-1255. Move RaftLog to raft-server-api. (#367)
---
 .../org/apache/ratis/server/raftlog/RaftLog.java   | 173 +++++++++++++++++++++
 .../ratis/server/raftlog/RaftLogSequentialOps.java |   0
 .../apache/ratis/server/impl/RaftServerImpl.java   |   3 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   4 +-
 .../raftlog/{RaftLog.java => RaftLogBase.java}     | 147 +++--------------
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  10 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java |  12 +-
 .../test/java/org/apache/ratis/RaftAsyncTests.java |   5 +-
 .../test/java/org/apache/ratis/RaftTestUtil.java   |   3 +-
 .../server/impl/RaftReconfigurationBaseTest.java   |   8 +-
 .../ratis/server/impl/RaftServerTestUtil.java      |   4 +
 .../ratis/server/storage/RaftStorageTestUtils.java |   4 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |   5 +-
 13 files changed, 223 insertions(+), 155 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
new file mode 100644
index 0000000..f84d010
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog;
+
+import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+/**
+ * {@link RaftLog} is a transaction log of a raft service.
+ */
+public interface RaftLog extends RaftLogSequentialOps, Closeable {
+  Logger LOG = LoggerFactory.getLogger(RaftLog.class);
+
+  /** The least valid log index, i.e. the index used when writing to an empty log. */
+  long LEAST_VALID_LOG_INDEX = 0L;
+  /** Invalid log index is used to indicate that the log index is missing. */
+  long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
+
+  /** Does this log contains the given {@link TermIndex}? */
+  default boolean contains(TermIndex ti) {
+    Objects.requireNonNull(ti, "ti == null");
+    return ti.equals(getTermIndex(ti.getIndex()));
+  }
+
+  /**
+   * Is the entry corresponding to the given {@link TermIndex} a configuration entry?
+   * In other words, the corresponding entry exists and
+   * {@link LogEntryProto#hasConfigurationEntry()} returns true.
+   */
+  boolean isConfigEntry(TermIndex ti);
+
+  /**
+   * @return null if the log entry is not found in this log;
+   *         otherwise, return the {@link TermIndex} of the log entry corresponding to the given index.
+   */
+  TermIndex getTermIndex(long index);
+
+  /**
+   * @return null if the log entry is not found in this log;
+   *         otherwise, return the log entry corresponding to the given index.
+   */
+  LogEntryProto get(long index) throws RaftLogIOException;
+
+  /**
+   * @return null if the log entry is not found in this log;
+   *         otherwise, return the {@link EntryWithData} corresponding to the given index.
+   */
+  EntryWithData getEntryWithData(long index) throws RaftLogIOException;
+
+  /**
+   * @param startIndex the starting log index (inclusive)
+   * @param endIndex the ending log index (exclusive)
+   * @return an array of {@link TermIndex} of all log entries within the given index range. Null if
+   *         startIndex is greater than the smallest available index.
+   */
+  TermIndex[] getEntries(long startIndex, long endIndex);
+
+  /** @return the index of the starting entry of this log. */
+  long getStartIndex();
+
+  /** @return the index of the next entry to append. */
+  default long getNextIndex() {
+    final TermIndex last = getLastEntryTermIndex();
+    if (last == null) {
+      // if the log is empty, the last committed index should be consistent with
+      // the last index included in the latest snapshot.
+      return getLastCommittedIndex() + 1;
+    }
+    return last.getIndex() + 1;
+  }
+
+  /** @return the index of the last entry that has been committed. */
+  long getLastCommittedIndex();
+
+  /** @return the index of the latest snapshot. */
+  long getSnapshotIndex();
+
+  /** @return the index of the last entry that has been flushed to the local storage. */
+  long getFlushIndex();
+
+  /** @return the {@link TermIndex} of the last log entry. */
+  TermIndex getLastEntryTermIndex();
+
+  /** @return the {@link RaftLogMetrics}. */
+  RaftLogMetrics getRaftLogMetrics();
+
+  /**
+   * Update the commit index.
+   * @param majorityIndex the index that has achieved majority.
+   * @param currentTerm the current term.
+   * @param isLeader Is this server the leader?
+   * @return true if commit index is changed; otherwise, return false.
+   */
+  boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader);
+
+  /**
+   * Update the snapshot index with the given index.
+   * Note that the commit index may also be changed by this update.
+   */
+  void updateSnapshotIndex(long newSnapshotIndex);
+
+  /** Open this log for read and write. */
+  void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException;
+
+  /**
+   * Purge asynchronously the log transactions.
+   * The implementation may choose to purge an index other than the suggested index.
+   *
+   * @param suggestedIndex the suggested index (inclusive) to be purged.
+   * @return the future of the actual purged log index.
+   */
+  CompletableFuture<Long> purge(long suggestedIndex);
+
+  /** Persist the given metadata. */
+  void persistMetadata(RaftStorageMetadata metadata) throws IOException;
+
+  /** Load metadata. */
+  RaftStorageMetadata loadMetadata() throws IOException;
+
+  /**
+   * A snapshot is installed so that the indices and other information of this log must be updated.
+   * This log may also purge the outdated entries.
+   *
+   * @return the future of the actual purged log index (inclusive).
+   */
+  CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex);
+
+  /**
+   * Log entry with state machine data.
+   *
+   * When both {@link LogEntryProto#hasStateMachineLogEntry()} and
+   * {@link StateMachineLogEntryProto#hasStateMachineEntry()} are true,
+   * the {@link StateMachineEntryProto} is removed from the original {@link LogEntryProto}
+   * before appending to this log.
+   * The {@link StateMachineEntryProto} is stored by the state machine but not in this log.
+   * When reading the log entry, this class rebuilds the original {@link LogEntryProto}
+   * containing both the log entry and the state machine data.
+   */
+  interface EntryWithData {
+    /** @return the serialized size including both log entry and state machine data. */
+    int getSerializedSize();
+
+    /** @return the {@link LogEntryProto} containing both the log entry and the state machine data. */
+    LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException;
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
similarity index 100%
rename from ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
rename to ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8939f4f..6bc6499 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -93,6 +93,7 @@ class RaftServerImpl implements RaftServer.Division,
   static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
   static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
   static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+  static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
 
   class Info implements DivisionInfo {
     @Override
@@ -1178,7 +1179,7 @@ class RaftServerImpl implements RaftServer.Division,
     commitInfos.forEach(commitInfoCache::update);
 
     if (!isHeartbeat) {
-      CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
+      CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
         (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index fe5e1f4..466eeb2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -222,7 +222,7 @@ class ServerState implements Closeable {
   }
 
   void persistMetadata() throws IOException {
-    log.writeMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
+    log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
   }
 
   /**
@@ -413,7 +413,7 @@ class ServerState implements Closeable {
   }
 
   void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
-    log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
+    log.onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
     latestInstalledSnapshot.set(lastTermIndexInSnapshot);
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
similarity index 76%
rename from ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
rename to ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 3f65b83..3e06963 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -21,11 +21,9 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftConfiguration;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -33,13 +31,9 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
@@ -55,10 +49,7 @@ import java.util.function.LongSupplier;
  * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
  *    in segments.
  */
-public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
-  public static final String LOG_SYNC = JavaUtils.getClassSimpleName(RaftLog.class) + ".logSync";
-
+public abstract class RaftLogBase implements RaftLog {
   private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
   private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
 
@@ -87,7 +78,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
 
   private volatile LogEntryProto lastMetadataEntry = null;
 
-  protected RaftLog(RaftGroupMemberId memberId,
+  protected RaftLogBase(RaftGroupMemberId memberId,
                     LongSupplier getSnapshotIndexFromStateMachine,
                     RaftProperties properties) {
     this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
@@ -102,12 +93,12 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     this.getSnapshotIndexFromStateMachine = getSnapshotIndexFromStateMachine;
   }
 
-  public abstract RaftLogMetrics getRaftLogMetrics();
-
+  @Override
   public long getLastCommittedIndex() {
     return commitIndex.get();
   }
 
+  @Override
   public long getSnapshotIndex() {
     return snapshotIndex.get();
   }
@@ -116,17 +107,12 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     state.assertOpen();
   }
 
+  /** Is this log already opened? */
   public boolean isOpened() {
     return state.isOpened();
   }
 
-  /**
-   * Update the last committed index.
-   * @param majorityIndex the index that has achieved majority.
-   * @param currentTerm the current term.
-   * @param isLeader Is this server the leader?
-   * @return true if update is applied; otherwise, return false, i.e. no update required.
-   */
+  @Override
   public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
     try(AutoCloseableLock writeLock = writeLock()) {
       final long oldCommittedIndex = getLastCommittedIndex();
@@ -152,11 +138,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
       updateSnapshotIndex(getSnapshotIndexFromStateMachine.getAsLong());
   }
 
-  /**
-   * Update the last committed index and snapshotIndex with the last index in
-   * the snapshot.
-   * @param newSnapshotIndex the last index in the snapshot
-   */
+  @Override
   public void updateSnapshotIndex(long newSnapshotIndex) {
     try(AutoCloseableLock writeLock = writeLock()) {
       final long oldSnapshotIndex = getSnapshotIndex();
@@ -170,29 +152,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     }
   }
 
-  /**
-   * Does the log contains the given term and index? Used to check the
-   * consistency between the local log of a follower and the log entries sent
-   * by the leader.
-   */
-  public boolean contains(TermIndex ti) {
-    Objects.requireNonNull(ti, "ti == null");
-    return ti.equals(getTermIndex(ti.getIndex()));
-  }
-
-  /**
-   * @return the index of the next log entry to append.
-   */
-  public long getNextIndex() {
-    final TermIndex last = getLastEntryTermIndex();
-    if (last == null) {
-      // if the log is empty, the last committed index should be consistent with
-      // the last index included in the latest snapshot.
-      return getLastCommittedIndex() + 1;
-    }
-    return last.getIndex() + 1;
-  }
-
   @Override
   public final long append(long term, TransactionContext transaction) throws StateMachineException {
     return runner.runSequentially(() -> appendImpl(term, transaction));
@@ -268,6 +227,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     }
     return true;
   }
+
   @Override
   public final long append(long term, RaftConfiguration configuration) {
     return runner.runSequentially(() -> appendImpl(term, configuration));
@@ -282,6 +242,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     }
   }
 
+  @Override
   public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
     openImpl(lastIndexInSnapshot, e -> {
       if (e.hasMetadataEntry()) {
@@ -303,48 +264,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
   protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
   }
 
-  public abstract long getStartIndex();
-
-  /**
-   * Get the log entry of the given index.
-   *
-   * @param index The given index.
-   * @return The log entry associated with the given index.
-   *         Null if there is no log entry with the index.
-   */
-  public abstract LogEntryProto get(long index) throws RaftLogIOException;
-
-  /**
-   * Get the log entry of the given index along with the state machine data.
-   *
-   * @param index The given index.
-   * @return The log entry associated with the given index.
-   *         Null if there is no log entry with the index.
-   */
-  public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException;
-
-  /**
-   * Get the TermIndex information of the given index.
-   *
-   * @param index The given index.
-   * @return The TermIndex of the log entry associated with the given index.
-   *         Null if there is no log entry with the index.
-   */
-  public abstract TermIndex getTermIndex(long index);
-
-  /**
-   * @param startIndex the starting log index (inclusive)
-   * @param endIndex the ending log index (exclusive)
-   * @return TermIndex of all log entries within the given index range. Null if
-   *         startIndex is greater than the smallest available index.
-   */
-  public abstract TermIndex[] getEntries(long startIndex, long endIndex);
-
-  /**
-   * @return the last log entry's term and index.
-   */
-  public abstract TermIndex getLastEntryTermIndex();
-
   /**
    * Validate the term and index of entry w.r.t RaftLog
    */
@@ -378,13 +297,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
 
   protected abstract CompletableFuture<Long> truncateImpl(long index);
 
-  /**
-   * Purge asynchronously the log transactions.
-   * The implementation may choose to purge an index other than the suggested index.
-   *
-   * @param suggestedIndex the suggested index (inclusive) to be purged.
-   * @return the future of the actual purged log index.
-   */
+  @Override
   public final CompletableFuture<Long> purge(long suggestedIndex) {
     final long lastPurge = purgeIndex.get();
     if (suggestedIndex - lastPurge < purgeGap) {
@@ -417,30 +330,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
 
   protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
 
-  /**
-   * @return the index of the last entry that has been flushed to the local storage.
-   */
-  public abstract long getFlushIndex();
-
-  /**
-   * Write and flush the metadata (votedFor and term) into the meta file.
-   *
-   * We need to guarantee that the order of writeMetadata calls is the same with
-   * that when we change the in-memory term/votedFor. Otherwise we may persist
-   * stale term/votedFor in file.
-   *
-   * Since the leader change is not frequent, currently we simply put this call
-   * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
-   * order.
-   */
-  public abstract void writeMetadata(RaftStorageMetadata metadata) throws IOException;
-
-  public abstract RaftStorageMetadata loadMetadata() throws IOException;
-
-  public abstract CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex);
-
-  public abstract boolean isConfigEntry(TermIndex ti);
-
   @Override
   public String toString() {
     return getName() + ":" + state + ":c" + getLastCommittedIndex();
@@ -471,26 +360,28 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     return name;
   }
 
+  protected EntryWithData newEntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
+    return new EntryWithDataImpl(logEntry, future);
+  }
+
   /**
    * Holds proto entry along with future which contains read state machine data
    */
-  public class EntryWithData {
+  class EntryWithDataImpl implements EntryWithData {
     private final LogEntryProto logEntry;
     private final CompletableFuture<ByteString> future;
 
-    public EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
+    EntryWithDataImpl(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
       this.logEntry = logEntry;
       this.future = future;
     }
 
-    public long getIndex() {
-      return logEntry.getIndex();
-    }
-
+    @Override
     public int getSerializedSize() {
       return LogProtoUtils.getSerializedSize(logEntry);
     }
 
+    @Override
     public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
       LogEntryProto entryProto;
       if (future == null) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 9dbf628..7fa91bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -22,7 +22,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.server.metrics.RaftLogMetricsBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
@@ -37,7 +37,7 @@ import java.util.function.LongSupplier;
 /**
  * A simple RaftLog implementation in memory. Used only for testing.
  */
-public class MemoryRaftLog extends RaftLog {
+public class MemoryRaftLog extends RaftLogBase {
   static class EntryList {
     private final List<LogEntryProto> entries = new ArrayList<>();
 
@@ -96,7 +96,7 @@ public class MemoryRaftLog extends RaftLog {
 
   @Override
   public EntryWithData getEntryWithData(long index) {
-    return new EntryWithData(get(index), null);
+    return newEntryWithData(get(index), null);
   }
 
   @Override
@@ -217,7 +217,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public void writeMetadata(RaftStorageMetadata newMetadata) {
+  public void persistMetadata(RaftStorageMetadata newMetadata) {
     metadata.set(newMetadata);
   }
 
@@ -227,7 +227,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+  public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
     return CompletableFuture.completedFuture(lastSnapshotIndex);
     // do nothing
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 070c5fa..1142bcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -24,7 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -76,7 +76,7 @@ import com.codahale.metrics.Timer;
  * in segments should be no smaller than the last index of snapshot, otherwise
  * we may have hole when append further log.
  */
-public class SegmentedRaftLog extends RaftLog {
+public class SegmentedRaftLog extends RaftLogBase {
   /**
    * I/O task definitions.
    */
@@ -286,7 +286,7 @@ public class SegmentedRaftLog extends RaftLog {
       throw new RaftLogIOException("Log entry not found: index = " + index);
     }
     if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
-      return new EntryWithData(entry, null);
+      return newEntryWithData(entry, null);
     }
 
     try {
@@ -297,7 +297,7 @@ public class SegmentedRaftLog extends RaftLog {
           return null;
         });
       }
-      return new EntryWithData(entry, future);
+      return newEntryWithData(entry, future);
     } catch (Exception e) {
       final String err = getName() + ": Failed readStateMachineData for " +
           LogProtoUtils.toLogEntryString(entry);
@@ -465,7 +465,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public void writeMetadata(RaftStorageMetadata metadata) throws IOException {
+  public void persistMetadata(RaftStorageMetadata metadata) throws IOException {
     storage.getMetadataFile().persist(metadata);
   }
 
@@ -475,7 +475,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+  public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
     fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
     // TODO purge normal/tmp/corrupt snapshot files
     // if the last index in snapshot is larger than the index of the last
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f614a9f..598faa3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -39,7 +39,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -75,8 +75,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
 
   public static final int NUM_SERVERS = 3;
 
-  private static final DelayLocalExecutionInjection logSyncDelay =
-      new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+  private static final DelayLocalExecutionInjection logSyncDelay = RaftServerTestUtil.getLogSyncDelay();
 
   {
     getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 7abddf3..6cfe89f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -484,7 +485,7 @@ public interface RaftTestUtil {
   }
 
   static LogEntryProto getLastEntry(LogEntryBodyCase targetCase, RaftLog raftLog) throws Exception {
-    try(AutoCloseableLock readLock = raftLog.readLock()) {
+    try(AutoCloseableLock readLock = ((RaftLogBase)raftLog).readLock()) {
       long i = raftLog.getNextIndex() - 1;
       for(; i >= 0; i--) {
         final LogEntryProto entry = raftLog.get(i);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 8ee057e..6bd11f7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
@@ -69,8 +70,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
     Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
   }
 
-  private static final DelayLocalExecutionInjection logSyncDelay =
-      new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+  private static final DelayLocalExecutionInjection logSyncDelay = RaftServerTestUtil.getLogSyncDelay();
   private static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
       new DelayLocalExecutionInjection(LeaderStateImpl.APPEND_PLACEHOLDER);
 
@@ -573,7 +573,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
   }
 
   void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception {
-    RaftLog log2 = null;
+    RaftLogBase log2 = null;
     try {
       RaftTestUtil.waitForLeader(cluster);
 
@@ -581,7 +581,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
       final RaftPeerId leaderId = leader.getId();
 
       final RaftLog log = leader.getRaftLog();
-      log2 = log;
+      log2 = (RaftLogBase) log;
       Thread.sleep(1000);
 
       // we block the incoming msg for the leader and block its requests to
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index fc6bc5a..88238ca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -52,6 +52,10 @@ public class RaftServerTestUtil {
   public static final RaftGroupMemberId TEST_MEMBER_ID = RaftGroupMemberId.valueOf(
       RaftPeerId.valueOf("test"), RaftGroupId.emptyGroupId());
 
+  public static DelayLocalExecutionInjection getLogSyncDelay() {
+    return new DelayLocalExecutionInjection(RaftServerImpl.LOG_SYNC);
+  }
+
   public static void setStateMachineUpdaterLogLevel(Level level) {
     Log4jUtils.setLogLevel(StateMachineUpdater.LOG, level);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index b498b29..9f192e4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -23,7 +23,7 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_
 import org.apache.ratis.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.util.AutoCloseableLock;
 
@@ -45,7 +45,7 @@ public interface RaftStorageTestUtils {
         + "." + memberId + "." + metricName;
   }
 
-  static void printLog(RaftLog log, Consumer<String> println) {
+  static void printLog(RaftLogBase log, Consumer<String> println) {
     if (log == null) {
       println.accept("log == null");
       return;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fe97f3e..2bc208b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -31,7 +31,6 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.RetryCache;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.metrics.RaftLogMetricsBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -290,7 +289,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex()
           + " (or snapshot index " + raftLog.getSnapshotIndex() + ") is greater than 1"));
 
-      raftLog.syncWithSnapshot(raftLog.getLastEntryTermIndex().getIndex());
+      raftLog.onSnapshotInstalled(raftLog.getLastEntryTermIndex().getIndex());
       try {
         // append entry fails if there are no log entries && log's snapshotIndex + 1 < incoming log entry.
         raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0))
@@ -324,7 +323,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.subList(0, entries.size() - 1).stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
 
-      raftLog.syncWithSnapshot(desiredSnapshotIndex);
+      raftLog.onSnapshotInstalled(desiredSnapshotIndex);
       // Try appending last entry after snapshot + purge.
       CompletableFuture<Long> appendEntryFuture =
           raftLog.appendEntry(entries.get(entries.size() - 1));