You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/22 02:42:51 UTC

[incubator-ratis] branch master updated: RATIS-1256. Leader updateCommit should use the updated index. (#369)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 195c572  RATIS-1256. Leader updateCommit should use the updated index. (#369)
195c572 is described below

commit 195c572024fc5fd88d00c3c0985c01215d2719aa
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 22 10:41:24 2020 +0800

    RATIS-1256. Leader updateCommit should use the updated index. (#369)
    
    * RATIS-1256. Leader updateCommit should use the updated index.
    
    * Fix a bug.
---
 .../ratis/server/metrics/RaftLogMetrics.java       |  4 +-
 .../apache/ratis/server/protocol/TermIndex.java    | 12 +--
 .../ratis/server/raftlog/LogEntryHeader.java       | 86 ++++++++++++++++++++++
 .../org/apache/ratis/server/raftlog/RaftLog.java   | 13 +---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 86 ++++++++++------------
 .../ratis/server/metrics/RaftLogMetricsBase.java   | 22 ++++--
 .../ratis/server/raftlog/memory/MemoryRaftLog.java | 20 ++---
 .../ratis/server/raftlog/segmented/LogSegment.java | 23 ++----
 .../server/raftlog/segmented/SegmentedRaftLog.java |  8 +-
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 19 ++---
 .../org/apache/ratis/OutputStreamBaseTest.java     |  9 ++-
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  8 +-
 .../ratis/datastream/DataStreamTestUtils.java      |  3 +-
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java    |  6 +-
 .../server/raftlog/memory/MemoryRaftLogTest.java   |  5 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  5 +-
 .../segmented/TestSegmentedRaftLogCache.java       |  3 +-
 17 files changed, 200 insertions(+), 132 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index 1e0ec99..42c1605 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -17,10 +17,10 @@
  */
 package org.apache.ratis.server.metrics;
 
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 
 /** Metrics for a raft log. */
 public interface RaftLogMetrics {
   /** A log entry just has been committed. */
-  void onLogEntryCommitted(LogEntryProto proto);
+  void onLogEntryCommitted(LogEntryHeader header);
 }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index c4f90b9..7def686 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -41,6 +41,13 @@ public interface TermIndex extends Comparable<TermIndex> {
         .build();
   }
 
+  @Override
+  default int compareTo(TermIndex that) {
+    return Comparator.comparingLong(TermIndex::getTerm)
+        .thenComparingLong(TermIndex::getIndex)
+        .compare(this, that);
+  }
+
   /** @return a {@link TermIndex} object from the given proto. */
   static TermIndex valueOf(TermIndexProto proto) {
     return Optional.ofNullable(proto).map(p -> valueOf(p.getTerm(), p.getIndex())).orElse(null);
@@ -65,11 +72,6 @@ public interface TermIndex extends Comparable<TermIndex> {
       }
 
       @Override
-      public int compareTo(TermIndex that) {
-        return Comparator.comparingLong(TermIndex::getTerm).thenComparingLong(TermIndex::getIndex).compare(this, that);
-      }
-
-      @Override
       public boolean equals(Object obj) {
         if (obj == this) {
           return true;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java
new file mode 100644
index 0000000..5300ba5
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/LogEntryHeader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog;
+
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
+import org.apache.ratis.server.protocol.TermIndex;
+
+import java.util.Comparator;
+
+/** The header of a {@link LogEntryProto} including {@link TermIndex} and {@link LogEntryBodyCase}. */
+public interface LogEntryHeader extends Comparable<LogEntryHeader> {
+  LogEntryHeader[] EMPTY_ARRAY = {};
+
+  /** @return the {@link TermIndex}. */
+  TermIndex getTermIndex();
+
+  default long getTerm() {
+    return getTermIndex().getTerm();
+  }
+
+  default long getIndex() {
+    return getTermIndex().getIndex();
+  }
+
+  /** @return the {@link LogEntryBodyCase}. */
+  LogEntryBodyCase getLogEntryBodyCase();
+
+  static LogEntryHeader valueOf(LogEntryProto entry) {
+    return valueOf(TermIndex.valueOf(entry), entry.getLogEntryBodyCase());
+  }
+
+  static LogEntryHeader valueOf(TermIndex ti, LogEntryBodyCase logEntryBodyCase) {
+    return new LogEntryHeader() {
+      @Override
+      public TermIndex getTermIndex() {
+        return ti;
+      }
+
+      @Override
+      public LogEntryBodyCase getLogEntryBodyCase() {
+        return logEntryBodyCase;
+      }
+
+      @Override
+      public int compareTo(LogEntryHeader that) {
+        return Comparator.comparing(LogEntryHeader::getTermIndex)
+            .thenComparing(LogEntryHeader::getLogEntryBodyCase)
+            .compare(this, that);
+      }
+
+      @Override
+      public int hashCode() {
+        return ti.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        } else if (!(obj instanceof LogEntryHeader)) {
+          return false;
+        }
+
+        final LogEntryHeader that = (LogEntryHeader) obj;
+        return this.getLogEntryBodyCase() == that.getLogEntryBodyCase()
+            && this.getTermIndex().equals(that.getTermIndex());
+      }
+    };
+  }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index f84d010..e504462 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -50,13 +50,6 @@ public interface RaftLog extends RaftLogSequentialOps, Closeable {
   }
 
   /**
-   * Is the entry corresponding to the given {@link TermIndex} a configuration entry?
-   * In other words, the corresponding entry exists and
-   * {@link LogEntryProto#hasConfigurationEntry()} returns true.
-   */
-  boolean isConfigEntry(TermIndex ti);
-
-  /**
    * @return null if the log entry is not found in this log;
    *         otherwise, return the {@link TermIndex} of the log entry corresponding to the given index.
    */
@@ -77,10 +70,10 @@ public interface RaftLog extends RaftLogSequentialOps, Closeable {
   /**
    * @param startIndex the starting log index (inclusive)
    * @param endIndex the ending log index (exclusive)
-   * @return an array of {@link TermIndex} of all log entries within the given index range. Null if
-   *         startIndex is greater than the smallest available index.
+   * @return null if entries are unavailable in this log;
+   *         otherwise, return the log entry headers within the given index range.
    */
-  TermIndex[] getEntries(long startIndex, long endIndex);
+  LogEntryHeader[] getEntries(long startIndex, long endIndex);
 
   /** @return the index of the starting entry of this log. */
   long getStartIndex();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 363ae3a..5db9158 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -39,9 +40,9 @@ import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.metrics.LogAppenderMetrics;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
@@ -719,31 +720,34 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  private void updateCommit(LogEntryHeader[] entriesToCommit) {
+    final long newCommitIndex = raftLog.getLastCommittedIndex();
+    logMetadata(newCommitIndex);
+    commitIndexChanged();
+
+    boolean hasConfiguration = false;
+    for (LogEntryHeader entry : entriesToCommit) {
+      if (entry.getIndex() > newCommitIndex) {
+        break;
+      }
+      hasConfiguration |= entry.getLogEntryBodyCase() == LogEntryBodyCase.CONFIGURATIONENTRY;
+      raftLog.getRaftLogMetrics().onLogEntryCommitted(entry);
+    }
+    if (hasConfiguration) {
+      checkAndUpdateConfiguration();
+    }
+  }
+
   private void updateCommit(long majority, long min) {
     final long oldLastCommitted = raftLog.getLastCommittedIndex();
     if (majority > oldLastCommitted) {
-      // copy the entries out from the raftlog, in order to prevent that
-      // the log gets purged after the statemachine does a snapshot
-      final TermIndex[] entriesToCommit = raftLog.getEntries(
-          oldLastCommitted + 1, majority + 1);
+      // Get the headers before updating commit index since the log can be purged after a snapshot
+      final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1);
 
       if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
-        watchRequests.update(ReplicationLevel.MAJORITY, majority);
-        logMetadata(majority);
-        commitIndexChanged();
-      }
-
-      try {
-        for (TermIndex entry : entriesToCommit) {
-          raftLog.getRaftLogMetrics().onLogEntryCommitted(raftLog.get(entry.getIndex()));
-        }
-      } catch (RaftLogIOException e) {
-        LOG.error("Caught exception reading from RaftLog", e);
+        updateCommit(entriesToCommit);
       }
-
-      checkAndUpdateConfiguration(entriesToCommit);
     }
-
     watchRequests.update(ReplicationLevel.ALL, min);
   }
 
@@ -752,36 +756,24 @@ class LeaderStateImpl implements LeaderState {
     notifySenders();
   }
 
-  private boolean committedConf(TermIndex[] entries) {
-    final long currentCommitted = raftLog.getLastCommittedIndex();
-    for (TermIndex entry : entries) {
-      if (entry.getIndex() <= currentCommitted && raftLog.isConfigEntry(entry)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void checkAndUpdateConfiguration(TermIndex[] entriesToCheck) {
+  private void checkAndUpdateConfiguration() {
     final RaftConfigurationImpl conf = server.getRaftConf();
-    if (committedConf(entriesToCheck)) {
-      if (conf.isTransitional()) {
-        replicateNewConf();
-      } else { // the (new) log entry has been committed
-        pendingRequests.replySetConfiguration(server::newSuccessReply);
-        // if the leader is not included in the current configuration, step down
-        if (!conf.containsInConf(server.getId())) {
-          LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
-          try {
-            // leave some time for all RPC senders to send out new conf entry
-            server.properties().minRpcTimeout().sleep();
-          } catch (InterruptedException ignored) {
-            Thread.currentThread().interrupt();
-          }
-          // the pending request handler will send NotLeaderException for
-          // pending client requests when it stops
-          server.close();
+    if (conf.isTransitional()) {
+      replicateNewConf();
+    } else { // the (new) log entry has been committed
+      pendingRequests.replySetConfiguration(server::newSuccessReply);
+      // if the leader is not included in the current configuration, step down
+      if (!conf.containsInConf(server.getId())) {
+        LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
+        try {
+          // leave some time for all RPC senders to send out new conf entry
+          server.properties().minRpcTimeout().sleep();
+        } catch (InterruptedException ignored) {
+          Thread.currentThread().interrupt();
         }
+        // the pending request handler will send NotLeaderException for
+        // pending client requests when it stops
+        server.close();
       }
     }
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
index f5ef1d6..aef6398 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java
@@ -22,8 +22,8 @@ import com.codahale.metrics.Timer;
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.RatisMetrics;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 
 public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics {
   public static final String RATIS_LOG_WORKER_METRICS_DESC = "Metrics for Log Worker";
@@ -56,13 +56,19 @@ public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics {
     return registry.timer(timerName);
   }
 
-  public void onLogEntryCommitted(LogEntryProto proto) {
-    if (proto.hasConfigurationEntry()) {
-      registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
-    } else if (proto.hasMetadataEntry()) {
-      registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
-    } else if (proto.hasStateMachineLogEntry()) {
-      registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+  @Override
+  public void onLogEntryCommitted(LogEntryHeader header) {
+    switch (header.getLogEntryBodyCase()) {
+      case CONFIGURATIONENTRY:
+        registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
+        return;
+      case METADATAENTRY:
+        registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
+        return;
+      case STATEMACHINELOGENTRY:
+        registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+        return;
+      default:
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 7fa91bc..6f80cdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.metrics.RaftLogMetricsBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.raftlog.RaftLogBase;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
@@ -49,6 +50,10 @@ public class MemoryRaftLog extends RaftLogBase {
       return TermIndex.valueOf(get(i));
     }
 
+    private LogEntryHeader getLogEntryHeader(int i) {
+      return LogEntryHeader.valueOf(get(i));
+    }
+
     int size() {
       return entries.size();
     }
@@ -108,7 +113,7 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  public TermIndex[] getEntries(long startIndex, long endIndex) {
+  public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
       if (startIndex >= entries.size()) {
@@ -116,11 +121,11 @@ public class MemoryRaftLog extends RaftLogBase {
       }
       final int from = Math.toIntExact(startIndex);
       final int to = Math.toIntExact(Math.min(entries.size(), endIndex));
-      TermIndex[] ti = new TermIndex[to - from];
-      for (int i = 0; i < ti.length; i++) {
-        ti[i] = entries.getTermIndex(i);
+      final LogEntryHeader[] headers = new LogEntryHeader[to - from];
+      for (int i = 0; i < headers.length; i++) {
+        headers[i] = entries.getLogEntryHeader(i);
       }
-      return ti;
+      return headers;
     }
   }
 
@@ -231,9 +236,4 @@ public class MemoryRaftLog extends RaftLogBase {
     return CompletableFuture.completedFuture(lastSnapshotIndex);
     // do nothing
   }
-
-  @Override
-  public boolean isConfigEntry(TermIndex ti) {
-    return get(ti.getIndex()).hasConfigurationEntry();
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 665d407..672d304 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -21,6 +21,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -35,11 +36,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -88,15 +87,19 @@ public class LogSegment implements Comparable<Long> {
   static class LogRecord {
     /** starting offset in the file */
     private final long offset;
-    private final TermIndex termIndex;
+    private final LogEntryHeader logEntryHeader;
 
     LogRecord(long offset, LogEntryProto entry) {
       this.offset = offset;
-      this.termIndex = TermIndex.valueOf(entry);
+      this.logEntryHeader = LogEntryHeader.valueOf(entry);
+    }
+
+    LogEntryHeader getLogEntryHeader() {
+      return logEntryHeader;
     }
 
     TermIndex getTermIndex() {
-      return termIndex;
+      return getLogEntryHeader().getTermIndex();
     }
 
     long getOffset() {
@@ -273,7 +276,6 @@ public class LogSegment implements Comparable<Long> {
    * the entryCache caches the content of log entries.
    */
   private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
-  private final Set<TermIndex> configEntries = new HashSet<>();
 
   private LogSegment(RaftStorage storage, boolean isOpen, long start, long end,
       SegmentedRaftLogMetrics raftLogMetrics) {
@@ -328,9 +330,6 @@ public class LogSegment implements Comparable<Long> {
     if (keepEntryInCache) {
       putEntryCache(record.getTermIndex(), entry, op);
     }
-    if (entry.hasConfigurationEntry()) {
-      configEntries.add(record.getTermIndex());
-    }
     totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
   }
@@ -370,10 +369,6 @@ public class LogSegment implements Comparable<Long> {
     return last == null ? null : last.getTermIndex();
   }
 
-  boolean isConfigEntry(TermIndex ti) {
-    return configEntries.contains(ti);
-  }
-
   long getTotalFileSize() {
     return totalFileSize;
   }
@@ -390,7 +385,6 @@ public class LogSegment implements Comparable<Long> {
     for (long index = endIndex; index >= fromIndex; index--) {
       LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
       removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
-      configEntries.remove(removed.getTermIndex());
       totalFileSize = removed.offset;
     }
     isOpen = false;
@@ -417,7 +411,6 @@ public class LogSegment implements Comparable<Long> {
   synchronized void clear() {
     records.clear();
     evictCache();
-    configEntries.clear();
     endIndex = startIndex - 1;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1142bcf..925d4de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -326,7 +327,7 @@ public class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  public TermIndex[] getEntries(long startIndex, long endIndex) {
+  public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
       return cache.getTermIndices(startIndex, endIndex);
@@ -499,11 +500,6 @@ public class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  public boolean isConfigEntry(TermIndex ti) {
-    return cache.isConfigEntry(ti);
-  }
-
-  @Override
   public void close() throws IOException {
     try(AutoCloseableLock writeLock = writeLock()) {
       super.close();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 2325556..60a55a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -221,8 +222,8 @@ public class SegmentedRaftLogCache {
       }
     }
 
-    TermIndex[] getTermIndex(long startIndex, long realEnd, LogSegment openSegment) {
-      final TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
+    LogEntryHeader[] getTermIndex(long startIndex, long realEnd, LogSegment openSegment) {
+      final LogEntryHeader[] entries = new LogEntryHeader[Math.toIntExact(realEnd - startIndex)];
       final int searchIndex;
       long index = startIndex;
 
@@ -487,7 +488,7 @@ public class SegmentedRaftLogCache {
    * @param startIndex inclusive
    * @param endIndex exclusive
    */
-  TermIndex[] getTermIndices(final long startIndex, final long endIndex) {
+  LogEntryHeader[] getTermIndices(final long startIndex, final long endIndex) {
     if (startIndex < 0 || startIndex < getStartIndex()) {
       throw new IndexOutOfBoundsException("startIndex = " + startIndex
           + ", log cache starts from index " + getStartIndex());
@@ -498,27 +499,21 @@ public class SegmentedRaftLogCache {
     }
     final long realEnd = Math.min(getEndIndex() + 1, endIndex);
     if (startIndex >= realEnd) {
-      return TermIndex.EMPTY_ARRAY;
+      return LogEntryHeader.EMPTY_ARRAY;
     }
     return closedSegments.getTermIndex(startIndex, realEnd, openSegment);
   }
 
   private static void getFromSegment(LogSegment segment, long startIndex,
-      TermIndex[] entries, int offset, int size) {
+      LogEntryHeader[] entries, int offset, int size) {
     long endIndex = segment.getEndIndex();
     endIndex = Math.min(endIndex, startIndex + size - 1);
     int index = offset;
     for (long i = startIndex; i <= endIndex; i++) {
-      LogRecord r = segment.getLogRecord(i);
-      entries[index++] = r == null ? null : r.getTermIndex();
+      entries[index++] = Optional.ofNullable(segment.getLogRecord(i)).map(LogRecord::getLogEntryHeader).orElse(null);
     }
   }
 
-  boolean isConfigEntry(TermIndex ti) {
-    LogSegment segment = getSegment(ti.getIndex());
-    return segment != null && segment.isConfigEntry(ti);
-  }
-
   long getStartIndex() {
     if (closedSegments.isEmpty()) {
       return Optional.ofNullable(openSegment).map(LogSegment::getStartIndex).orElse(RaftLog.INVALID_LOG_INDEX);
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 823dfb2..aa69fa4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.StringUtils;
@@ -95,9 +96,9 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
     long committedIndex = raftLog.getLastCommittedIndex();
     Assert.assertTrue(committedIndex >= expectedCommittedIndex);
     // check the log content
-    TermIndex[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
+    final LogEntryHeader[] entries = raftLog.getEntries(0, Long.MAX_VALUE);
     int count = 0;
-    for (TermIndex entry : entries) {
+    for (LogEntryHeader entry : entries) {
       LogEntryProto log  = raftLog.get(entry.getIndex());
       if (!log.hasStateMachineLogEntry()) {
         continue;
@@ -244,10 +245,10 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
     final int expectedEntries = 8;
     final RaftLog raftLog = assertRaftLog(expectedEntries, leader);
 
-    final TermIndex[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
+    final LogEntryHeader[] entries = raftLog.getEntries(1, Long.MAX_VALUE);
     final byte[] actual = new byte[ByteValue.BUFFERSIZE * expectedEntries];
     totalSize = 0;
-    for (TermIndex ti : entries) {
+    for (LogEntryHeader ti : entries) {
       final LogEntryProto e = raftLog.get(ti.getIndex());
       if (e.hasStateMachineLogEntry()) {
         final byte[] eValue = e.getStateMachineLogEntry().getLogData().toByteArray();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 6cfe89f..6987d0a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,7 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogBase;
@@ -156,7 +156,7 @@ public interface RaftTestUtil {
   static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
-    TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+    final LogEntryHeader[] termIndices = log.getEntries(startIndex, endIndex);
     while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
       try {
@@ -177,7 +177,7 @@ public interface RaftTestUtil {
   static boolean logEntriesNotContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
-    TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+    final LogEntryHeader[] termIndices = log.getEntries(startIndex, endIndex);
     while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
       try {
@@ -197,7 +197,7 @@ public interface RaftTestUtil {
 
   static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages,
       Predicate<LogEntryProto> predicate) {
-    TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+    final LogEntryHeader[] termIndices = log.getEntries(0, Long.MAX_VALUE);
     for (int i = 0; i < termIndices.length; i++) {
       for (int j = 0; j < expectedMessages.length; j++) {
         final LogEntryProto e;
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index f7cdc36..89eed98 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -38,6 +38,7 @@ import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
@@ -359,7 +360,7 @@ public interface DataStreamTestUtils {
   }
 
   static LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog log) throws Exception {
-    for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+    for (LogEntryHeader termIndex : log.getEntries(0, Long.MAX_VALUE)) {
       final LogEntryProto entry = log.get(termIndex.getIndex());
       if (entry.hasStateMachineLogEntry()) {
         if (invocationId.match(entry.getStateMachineLogEntry())) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 4e92efd..d5dd0c3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -24,7 +24,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
@@ -96,11 +96,11 @@ public class TestRaftWithGrpc
       cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).forEach(raftServer ->
           JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
         final long leaderNextIndex = leaderLog.getNextIndex();
-        final TermIndex[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);
+        final LogEntryHeader[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);
 
         final RaftLog followerLog = raftServer.getRaftLog();
         Assert.assertEquals(leaderNextIndex, followerLog.getNextIndex());
-        final TermIndex[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE);
+        final LogEntryHeader[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE);
         Assert.assertArrayEquals(serverEntries, leaderEntries);
       }, 10, HUNDRED_MILLIS, "assertRaftLog-" + raftServer.getId(), LOG)));
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
index 6c893b3..c2cfb75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
@@ -60,7 +61,7 @@ public class MemoryRaftLogTest extends BaseTest {
     entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(0).build();
     raftLog.append(entries2).forEach(CompletableFuture::join);
 
-    TermIndex[] termIndices = raftLog.getEntries(0, 10);
+    final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(2, termIndices.length);
     for (int i = 0; i < 2; i++) {
       assertEquals(entries1[i].getIndex(), termIndices[i].getIndex());
@@ -88,7 +89,7 @@ public class MemoryRaftLogTest extends BaseTest {
     entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(2).build();
     raftLog.append(entries2).forEach(CompletableFuture::join);
 
-    TermIndex[] termIndices = raftLog.getEntries(0, 10);
+    final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(1, termIndices.length);
     assertEquals(entries2[0].getIndex(), termIndices[0].getIndex());
     assertEquals(entries2[0].getTerm(), termIndices[0].getTerm());
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 2bc208b..e1fff14 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.RetryCache;
 import org.apache.ratis.server.metrics.RaftLogMetricsBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -198,7 +199,7 @@ public class TestSegmentedRaftLog extends BaseTest {
         Assert.assertEquals(e, entry);
       }
 
-      TermIndex[] termIndices = raftLog.getEntries(0, 500);
+      final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500);
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
           .map(ti -> {
             try {
@@ -406,7 +407,7 @@ public class TestSegmentedRaftLog extends BaseTest {
         LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
         Assert.assertEquals(expected.get(i), entry);
       }
-      TermIndex[] termIndices = raftLog.getEntries(
+      final LogEntryHeader[] termIndices = raftLog.getEntries(
           expected.get(offset).getIndex(),
           expected.get(offset + size - 1).getIndex() + 1);
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 675b942..976e9d6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -29,6 +29,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
@@ -93,7 +94,7 @@ public class TestSegmentedRaftLogCache {
   }
 
   private void checkCacheEntries(long offset, int size, long end) {
-    TermIndex[] entries = cache.getTermIndices(offset, offset + size);
+    final LogEntryHeader[] entries = cache.getTermIndices(offset, offset + size);
     long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
     Assert.assertEquals(realEnd - offset, entries.length);
     for (long i = offset; i < realEnd; i++) {