You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ar...@apache.org on 2023/02/17 00:34:24 UTC

[ratis] branch master updated: RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f54dfe8 RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
c4f54dfe8 is described below

commit c4f54dfe88d1b574a03688f16a592cf0d59d13a9
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 16 16:34:18 2023 -0800

    RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
---
 .../ratis/server/raftlog/segmented/LogSegment.java | 47 +++++++++++-----------
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  7 +++-
 .../segmented/SegmentedRaftLogInputStream.java     | 15 ++++---
 .../raftlog/segmented/SegmentedRaftLogReader.java  | 27 +++++++------
 .../segmented/SegmentedRaftLogTestUtils.java       | 10 +++++
 .../impl/SimpleStateMachine4Testing.java           |  4 +-
 .../raftlog/segmented/TestCacheEviction.java       |  4 +-
 .../server/raftlog/segmented/TestLogSegment.java   | 20 ++++-----
 .../raftlog/segmented/TestRaftLogReadWrite.java    |  7 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  3 +-
 .../java/org/apache/ratis/tools/ParseRatisLog.java | 14 +++++--
 11 files changed, 94 insertions(+), 64 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c3c4d6e53..b8e0e72ff 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -31,6 +31,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,35 +103,31 @@ public final class LogSegment implements Comparable<Long> {
     }
   }
 
-  static LogSegment newOpenSegment(RaftStorage storage, long start, SegmentedRaftLogMetrics raftLogMetrics) {
+  static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
+      SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0);
-    return new LogSegment(storage, true, start, start - 1, raftLogMetrics);
+    return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics);
   }
 
   @VisibleForTesting
   static LogSegment newCloseSegment(RaftStorage storage,
-      long start, long end, SegmentedRaftLogMetrics raftLogMetrics) {
+      long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0 && end >= start);
-    return new LogSegment(storage, false, start, end, raftLogMetrics);
-  }
-
-  static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SegmentedRaftLogMetrics metrics) {
-    return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), metrics)
-        : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), metrics);
+    return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics);
   }
 
-  public static int readSegmentFile(File file, LogSegmentStartEnd startEnd,
-      CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
-      throws IOException {
-    return readSegmentFile(file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(),
-        corruptionPolicy, raftLogMetrics, entryConsumer);
+  static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
+      SegmentedRaftLogMetrics metrics) {
+    return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), maxOpSize, metrics)
+        : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), maxOpSize, metrics);
   }
 
-  private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+  public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
       CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
       throws IOException {
     int count = 0;
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
+    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+        file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), maxOpSize, raftLogMetrics)) {
       for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
         if (prev != null) {
           Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
@@ -146,8 +143,8 @@ public final class LogSegment implements Comparable<Long> {
       switch (corruptionPolicy) {
         case EXCEPTION: throw ioe;
         case WARN_AND_RETURN:
-          LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully",
-              file, start, end, isOpen, count, ioe);
+          LOG.warn("Failed to read segment file {} ({}): only {} entries read successfully",
+              file, startEnd, count, ioe);
           break;
         default:
           throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy
@@ -158,13 +155,13 @@ public final class LogSegment implements Comparable<Long> {
     return count;
   }
 
-  static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd,
+  static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
       boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, SegmentedRaftLogMetrics raftLogMetrics)
       throws IOException {
-    final LogSegment segment = newLogSegment(storage, startEnd, raftLogMetrics);
+    final LogSegment segment = newLogSegment(storage, startEnd, maxOpSize, raftLogMetrics);
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
     final boolean isOpen = startEnd.isOpen();
-    final int entryCount = readSegmentFile(file, startEnd, corruptionPolicy, raftLogMetrics, entry -> {
+    final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> {
       segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
       if (logConsumer != null) {
         logConsumer.accept(entry);
@@ -235,7 +232,9 @@ public final class LogSegment implements Comparable<Long> {
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
       final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
-      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics, entry -> {
+      final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
+      readSegmentFile(file, startEnd, maxOpSize,
+          getLogCorruptionPolicy(), raftLogMetrics, entry -> {
         final TermIndex ti = TermIndex.valueOf(entry);
         putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
         if (ti.equals(key.getTermIndex())) {
@@ -259,6 +258,7 @@ public final class LogSegment implements Comparable<Long> {
   /** Segment end index, inclusive. */
   private volatile long endIndex;
   private RaftStorage storage;
+  private final SizeInBytes maxOpSize;
   private final LogEntryLoader cacheLoader;
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
@@ -272,12 +272,13 @@ public final class LogSegment implements Comparable<Long> {
    */
   private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
 
-  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end,
+  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize,
       SegmentedRaftLogMetrics raftLogMetrics) {
     this.storage = storage;
     this.isOpen = isOpen;
     this.startIndex = start;
     this.endIndex = end;
+    this.maxOpSize = maxOpSize;
     this.cacheLoader = new LogEntryLoader(raftLogMetrics);
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 3f79e69db..631cbf951 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -32,6 +32,7 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AutoCloseableReadWriteLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -350,6 +351,7 @@ public class SegmentedRaftLogCache {
   private volatile LogSegment openSegment;
   private final LogSegmentList closedSegments;
   private final RaftStorage storage;
+  private final SizeInBytes maxOpSize;
   private final SegmentedRaftLogMetrics raftLogMetrics;
 
   private final int maxCachedSegments;
@@ -367,6 +369,7 @@ public class SegmentedRaftLogCache {
     this.raftLogMetrics.addOpenSegmentSizeInBytes(this::getOpenSegmentSizeInBytes);
     this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
     this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
+    this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
   }
 
   int getMaxCachedSegments() {
@@ -376,7 +379,7 @@ public class SegmentedRaftLogCache {
   void loadSegment(LogSegmentPath pi, boolean keepEntryInCache,
       Consumer<LogEntryProto> logConsumer) throws IOException {
     final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(),
-        keepEntryInCache, logConsumer, raftLogMetrics);
+        maxOpSize, keepEntryInCache, logConsumer, raftLogMetrics);
     if (logSegment != null) {
       addSegment(logSegment);
     }
@@ -434,7 +437,7 @@ public class SegmentedRaftLogCache {
   }
 
   void addOpenSegment(long startIndex) {
-    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics));
+    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, maxOpSize, raftLogMetrics));
   }
 
   private void setOpenSegment(LogSegment openSegment) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index e445b1abb..481f837f5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +67,12 @@ public class SegmentedRaftLogInputStream implements Closeable {
   private final boolean isOpen;
   private final OpenCloseState state;
   private SegmentedRaftLogReader reader;
+  private final SizeInBytes maxOpSize;
   private final SegmentedRaftLogMetrics raftLogMetrics;
 
-  public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) {
-    this(log, startIndex, endIndex, isOpen, null);
-  }
-
   SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
-      SegmentedRaftLogMetrics raftLogMetrics) {
+      SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
+    this.maxOpSize = maxOpSize;
     if (isOpen) {
       Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
     } else {
@@ -92,7 +91,7 @@ public class SegmentedRaftLogInputStream implements Closeable {
     state.open();
     boolean initSuccess = false;
     try {
-      reader = new SegmentedRaftLogReader(logFile, raftLogMetrics);
+      reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics);
       initSuccess = reader.verifyHeader();
     } finally {
       if (!initSuccess) {
@@ -191,11 +190,11 @@ public class SegmentedRaftLogInputStream implements Closeable {
    * @return Result of the validation
    * @throws IOException
    */
-  static LogValidation scanEditLog(File file, long maxTxIdToScan)
+  static LogValidation scanEditLog(File file, long maxTxIdToScan, SizeInBytes maxOpSize)
       throws IOException {
     SegmentedRaftLogInputStream in;
     try {
-      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null);
+      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, maxOpSize, null);
       // read the header, initialize the inputstream
       in.init();
     } catch (EOFException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index 7f5509401..4cb9e3130 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -28,6 +28,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,21 +133,22 @@ class SegmentedRaftLogReader implements Closeable {
     }
   }
 
-  private static final int MAX_OP_SIZE = 32 * 1024 * 1024;
-
   private final File file;
   private final LimitedInputStream limiter;
   private final DataInputStream in;
   private byte[] temp = new byte[4096];
   private final Checksum checksum;
   private final SegmentedRaftLogMetrics raftLogMetrics;
+  private final SizeInBytes maxOpSize;
 
-  SegmentedRaftLogReader(File file, SegmentedRaftLogMetrics raftLogMetrics) throws FileNotFoundException {
+  SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics)
+      throws FileNotFoundException {
     this.file = file;
     this.limiter = new LimitedInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     in = new DataInputStream(limiter);
     checksum = new PureJavaCrc32C();
+    this.maxOpSize = maxOpSize;
     this.raftLogMetrics = raftLogMetrics;
   }
 
@@ -267,8 +269,9 @@ class SegmentedRaftLogReader implements Closeable {
    * @return The log entry, or null if we hit EOF.
    */
   private LogEntryProto decodeEntry() throws IOException {
-    limiter.setLimit(MAX_OP_SIZE);
-    in.mark(MAX_OP_SIZE);
+    final int max = maxOpSize.getSizeInt();
+    limiter.setLimit(max);
+    in.mark(max);
 
     byte nextByte;
     try {
@@ -288,17 +291,17 @@ class SegmentedRaftLogReader implements Closeable {
     // Here, we verify that the Op size makes sense and that the
     // data matches its checksum before attempting to construct an Op.
     int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
-    if (entryLength > MAX_OP_SIZE) {
+    if (entryLength > max) {
       throw new IOException("Entry has size " + entryLength
-          + ", but MAX_OP_SIZE = " + MAX_OP_SIZE);
+          + ", but MAX_OP_SIZE = " + maxOpSize);
     }
 
     final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
         entryLength);
     final int totalLength = varintLength + entryLength;
-    checkBufferSize(totalLength);
+    checkBufferSize(totalLength, max);
     in.reset();
-    in.mark(MAX_OP_SIZE);
+    in.mark(max);
     IOUtils.readFully(in, temp, 0, totalLength);
 
     // verify checksum
@@ -317,12 +320,12 @@ class SegmentedRaftLogReader implements Closeable {
         CodedInputStream.newInstance(temp, varintLength, entryLength));
   }
 
-  private void checkBufferSize(int entryLength) {
-    Preconditions.assertTrue(entryLength <= MAX_OP_SIZE);
+  private void checkBufferSize(int entryLength, int max) {
+    Preconditions.assertTrue(entryLength <= max);
     int length = temp.length;
     if (length < entryLength) {
       while (length < entryLength) {
-        length = Math.min(length * 2, MAX_OP_SIZE);
+        length = Math.min(length * 2, max);
       }
       temp = new byte[length];
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index f2387f3f0..e242eddf5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -17,10 +17,20 @@
  */
 package org.apache.ratis.server.raftlog.segmented;
 
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.Slf4jUtils;
 import org.slf4j.event.Level;
 
+import java.io.File;
+
 public interface SegmentedRaftLogTestUtils {
+  SizeInBytes MAX_OP_SIZE = SizeInBytes.valueOf("32MB");
+
+  static SegmentedRaftLogInputStream newSegmentedRaftLogInputStream(File log,
+      long startIndex, long endIndex, boolean isOpen) {
+    return new SegmentedRaftLogInputStream(log, startIndex, endIndex, isOpen, MAX_OP_SIZE, null);
+  }
+
   static void setRaftLogWorkerLogLevel(Level level) {
     Slf4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, level);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 800ab02ea..2ce5643cc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -36,9 +36,9 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogInputStream;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -295,7 +295,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
         return;
       }
       final long endIndex = snapshot.getIndex();
-      try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+      try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 3acc17164..996f7ef52 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
+
 public class TestCacheEviction extends BaseTest {
   private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
 
@@ -56,7 +58,7 @@ public class TestCacheEviction extends BaseTest {
     Assert.assertEquals(numSegments, cached.length);
     final LogSegmentList segments = new LogSegmentList(JavaUtils.getClassSimpleName(TestCacheEviction.class));
     for (int i = 0; i < numSegments; i++) {
-      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null);
+      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, MAX_OP_SIZE, null);
       if (cached[i]) {
         s = Mockito.spy(s);
         Mockito.when(s.hasCache()).thenReturn(true);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index e573248f4..0a58f44f6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
 import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
 /**
  * Test basic functionality of {@link LogSegment}
@@ -169,7 +170,7 @@ public class TestLogSegment extends BaseTest {
     final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
-        LogSegmentStartEnd.valueOf(0), loadInitial, null, null);
+        LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, loadInitial, null, null);
     final int delta = isLastEntryPartiallyWritten? 1: 0;
     checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
     storage.close();
@@ -179,7 +180,7 @@ public class TestLogSegment extends BaseTest {
     // load a closed segment (1000-1099)
     final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
     LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
-        LogSegmentStartEnd.valueOf(1000, 1099L), loadInitial, null, null);
+        LogSegmentStartEnd.valueOf(1000, 1099L), MAX_OP_SIZE, loadInitial, null, null);
     checkLogSegment(closedSegment, 1000, 1099, false,
         closedSegment.getTotalFileSize(), 1);
     Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -188,7 +189,7 @@ public class TestLogSegment extends BaseTest {
   @Test
   public void testAppendEntries() throws Exception {
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     long size = SegmentedRaftLogFormat.getHeaderLength();
     final long max = 8 * 1024 * 1024;
     checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -214,7 +215,7 @@ public class TestLogSegment extends BaseTest {
     final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
-        LogSegmentStartEnd.valueOf(0), true, null, raftLogMetrics);
+        LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, true, null, raftLogMetrics);
     checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
     storage.close();
 
@@ -227,7 +228,7 @@ public class TestLogSegment extends BaseTest {
 
   @Test
   public void testAppendWithGap() throws Exception {
-    LogSegment segment = LogSegment.newOpenSegment(null, 1000, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, 1000, MAX_OP_SIZE, null);
     SimpleOperation op = new SimpleOperation("m");
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
@@ -254,7 +255,7 @@ public class TestLogSegment extends BaseTest {
   public void testTruncate() throws Exception {
     final long term = 1;
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
@@ -297,7 +298,8 @@ public class TestLogSegment extends BaseTest {
                 new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
         }
-        try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+        try(SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+            file, 0, INVALID_LOG_INDEX, true)) {
           LogEntryProto entry = in.nextEntry();
           Assert.assertNull(entry);
         }
@@ -317,8 +319,8 @@ public class TestLogSegment extends BaseTest {
     }
     Assert.assertEquals(file.length(),
         size + SegmentedRaftLogFormat.getHeaderLength());
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0,
-        INVALID_LOG_INDEX, true)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        file, 0, INVALID_LOG_INDEX, true)) {
       LogEntryProto entry = in.nextEntry();
       Assert.assertArrayEquals(content,
           entry.getStateMachineLogEntry().getLogData().toByteArray());
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 88b5e2f48..e79f9f7f9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -75,7 +75,8 @@ public class TestRaftLogReadWrite extends BaseTest {
   private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
       boolean isOpen) throws IOException {
     List<LogEntryProto> list = new ArrayList<>();
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, startIndex, endIndex, isOpen)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        file, startIndex, endIndex, isOpen)) {
       LogEntryProto entry;
       while ((entry = in.nextEntry()) != null) {
         list.add(entry);
@@ -207,8 +208,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     List<LogEntryProto> list = new ArrayList<>();
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(openSegment, 0,
-        RaftLog.INVALID_LOG_INDEX, true)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        openSegment, 0, RaftLog.INVALID_LOG_INDEX, true)) {
       LogEntryProto entry;
       while ((entry = in.nextEntry()) != null) {
         list.add(entry);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 4f135628c..1cf3d0248 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -59,7 +60,7 @@ public class TestSegmentedRaftLogCache {
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
-    LogSegment s = LogSegment.newOpenSegment(null, start, null);
+    LogSegment s = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
index 250a4790a..564ce0bf0 100644
--- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
+++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
@@ -24,6 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
 import org.apache.ratis.server.raftlog.segmented.LogSegment;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,15 +34,17 @@ public final class ParseRatisLog {
 
   private final File file;
   private final Function<StateMachineLogEntryProto, String> smLogToString;
+  private final SizeInBytes maxOpSize;
 
   private long numConfEntries;
   private long numMetadataEntries;
   private long numStateMachineEntries;
   private long numInvalidEntries;
 
-  private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString) {
+  private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, SizeInBytes maxOpSize) {
     this.file = f;
     this.smLogToString = smLogToString;
+    this.maxOpSize = maxOpSize;
     this.numConfEntries = 0;
     this.numMetadataEntries = 0;
     this.numStateMachineEntries = 0;
@@ -56,7 +59,7 @@ public final class ParseRatisLog {
     }
 
     System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length());
-    final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(),
+    final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), maxOpSize,
         RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry);
     System.out.println("Num Total Entries: " + entryCount);
     System.out.println("Num Conf Entries: " + numConfEntries);
@@ -85,7 +88,12 @@ public final class ParseRatisLog {
   public static class Builder {
     private File file = null;
     private Function<StateMachineLogEntryProto, String> smLogToString = null;
+    private SizeInBytes maxOpSize = SizeInBytes.valueOf("32MB");
 
+    public Builder setMaxOpSize(SizeInBytes maxOpSize) {
+      this.maxOpSize = maxOpSize;
+      return this;
+    }
 
     public Builder setSegmentFile(File segmentFile) {
       this.file = segmentFile;
@@ -98,7 +106,7 @@ public final class ParseRatisLog {
     }
 
     public ParseRatisLog build() {
-      return new ParseRatisLog(file, smLogToString);
+      return new ParseRatisLog(file, smLogToString, maxOpSize);
     }
   }
 }