You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/04 10:31:56 UTC

[incubator-ratis] branch master updated: RATIS-1198. Fix OOM when use FileStore to write 1GB file (#317)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 66768c4  RATIS-1198. Fix OOM when use FileStore to write 1GB file (#317)
66768c4 is described below

commit 66768c430a220f30e4069ed80687ab30e0718d2e
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 4 18:31:47 2020 +0800

    RATIS-1198. Fix OOM when use FileStore to write 1GB file (#317)
---
 .../ratis/netty/server/DataStreamManagement.java   |  1 -
 .../apache/ratis/server/RaftServerConfigKeys.java  | 10 +++
 .../ratis/server/raftlog/segmented/LogSegment.java | 76 ++++++++++++++++------
 .../server/raftlog/segmented/SegmentedRaftLog.java | 16 +++--
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 45 +++++++++----
 .../server/raftlog/segmented/TestLogSegment.java   | 24 +++----
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  2 +-
 .../segmented/TestSegmentedRaftLogCache.java       |  6 +-
 8 files changed, 123 insertions(+), 57 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 20be5bc..4ea4507 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -375,7 +375,6 @@ public class DataStreamManagement {
           } else if (request.getType() == Type.STREAM_CLOSE) {
             if (info.isPrimary()) {
               // after all server close stream, primary server start transaction
-              // TODO(runzhiwang): send start transaction to leader directly
               startTransaction(info, request, ctx);
             } else {
               sendReply(remoteWrites, request, bytesWritten, ctx);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9543380..0b5a84f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -221,6 +221,16 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, SEGMENT_CACHE_NUM_MAX_KEY, maxCachedSegmentNum);
     }
 
+    String SEGMENT_CACHE_SIZE_MAX_KEY = PREFIX + ".segment.cache.size.max";
+    SizeInBytes SEGMENT_CACHE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("200MB");
+    static SizeInBytes segmentCacheSizeMax(RaftProperties properties) {
+      return getSizeInBytes(properties::getSizeInBytes, SEGMENT_CACHE_SIZE_MAX_KEY,
+          SEGMENT_CACHE_SIZE_MAX_DEFAULT, getDefaultLog());
+    }
+    static void setSegmentCacheSizeMax(RaftProperties properties, SizeInBytes maxCachedSegmentSize) {
+      setSizeInBytes(properties::set, SEGMENT_CACHE_SIZE_MAX_KEY, maxCachedSegmentSize);
+    }
+
     String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size";
     SizeInBytes PREALLOCATED_SIZE_DEFAULT = SizeInBytes.valueOf("4MB");
     static SizeInBytes preallocatedSize(RaftProperties properties) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index fb58a4a..fc0509e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -42,6 +42,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 
@@ -60,8 +61,26 @@ public class LogSegment implements Comparable<Long> {
 
   static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
 
-  static long getEntrySize(LogEntryProto entry) {
-    final int serialized = ServerProtoUtils.removeStateMachineData(entry).getSerializedSize();
+  enum Op {
+    LOAD_SEGMENT_FILE,
+    REMOVE_CACHE,
+    CHECK_SEGMENT_FILE_FULL,
+    WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
+    WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE
+  }
+
+  static long getEntrySize(LogEntryProto entry, Op op) {
+    LogEntryProto e = entry;
+    if (op == Op.CHECK_SEGMENT_FILE_FULL) {
+      e = ServerProtoUtils.removeStateMachineData(entry);
+    } else if (op == Op.LOAD_SEGMENT_FILE || op == Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE) {
+      Preconditions.assertTrue(entry == ServerProtoUtils.removeStateMachineData(entry),
+          () -> "Unexpected LogEntryProto with StateMachine data: op=" + op + ", entry=" + entry);
+    } else {
+      Preconditions.assertTrue(op == Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE || op == Op.REMOVE_CACHE,
+          () -> "Unexpected op " + op + ", entry=" + entry);
+    }
+    final int serialized = e.getSerializedSize();
     return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4L;
   }
 
@@ -139,7 +158,7 @@ public class LogSegment implements Comparable<Long> {
 
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
     final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, raftLogMetrics, entry -> {
-      segment.append(keepEntryInCache || isOpen, entry);
+      segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
       if (logConsumer != null) {
         logConsumer.accept(entry);
       }
@@ -161,9 +180,9 @@ public class LogSegment implements Comparable<Long> {
       // The segment does not have any entries, delete the file.
       FileUtils.deleteFile(file);
       return null;
-    } else if (file.length() > segment.getTotalSize()) {
+    } else if (file.length() > segment.getTotalFileSize()) {
       // The segment has extra padding, truncate it.
-      FileUtils.truncateFile(file, segment.getTotalSize());
+      FileUtils.truncateFile(file, segment.getTotalFileSize());
     }
 
     try {
@@ -211,7 +230,7 @@ public class LogSegment implements Comparable<Long> {
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
       readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics,
-          entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
+          entry -> putEntryCache(ServerProtoUtils.toTermIndex(entry), entry, Op.LOAD_SEGMENT_FILE));
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
     }
@@ -224,7 +243,8 @@ public class LogSegment implements Comparable<Long> {
   }
 
   private volatile boolean isOpen;
-  private long totalSize = SegmentedRaftLogFormat.getHeaderLength();
+  private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength();
+  private AtomicLong totalCacheSize = new AtomicLong(0);
   /** Segment start index, inclusive. */
   private long startIndex;
   /** Segment end index, inclusive. */
@@ -273,12 +293,12 @@ public class LogSegment implements Comparable<Long> {
     return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
   }
 
-  void appendToOpenSegment(LogEntryProto entry) {
+  void appendToOpenSegment(LogEntryProto entry, Op op) {
     Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this);
-    append(true, entry);
+    append(true, entry, op);
   }
 
-  private void append(boolean keepEntryInCache, LogEntryProto entry) {
+  private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
     Objects.requireNonNull(entry, "entry == null");
     if (records.isEmpty()) {
       Preconditions.assertTrue(entry.getIndex() == startIndex,
@@ -292,15 +312,15 @@ public class LogSegment implements Comparable<Long> {
           "gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
     }
 
-    final LogRecord record = new LogRecord(totalSize, entry);
+    final LogRecord record = new LogRecord(totalFileSize, entry);
     records.add(record);
     if (keepEntryInCache) {
-      entryCache.put(record.getTermIndex(), entry);
+      putEntryCache(record.getTermIndex(), entry, op);
     }
     if (entry.hasConfigurationEntry()) {
       configEntries.add(record.getTermIndex());
     }
-    totalSize += getEntrySize(entry);
+    totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
   }
 
@@ -343,8 +363,12 @@ public class LogSegment implements Comparable<Long> {
     return configEntries.contains(ti);
   }
 
-  long getTotalSize() {
-    return totalSize;
+  long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  long getTotalCacheSize() {
+    return totalCacheSize.get();
   }
 
   /**
@@ -354,9 +378,9 @@ public class LogSegment implements Comparable<Long> {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
     for (long index = endIndex; index >= fromIndex; index--) {
       LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
-      entryCache.remove(removed.getTermIndex());
+      removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
       configEntries.remove(removed.getTermIndex());
-      totalSize = removed.offset;
+      totalFileSize = removed.offset;
     }
     isOpen = false;
     this.endIndex = fromIndex - 1;
@@ -381,7 +405,7 @@ public class LogSegment implements Comparable<Long> {
 
   synchronized void clear() {
     records.clear();
-    entryCache.clear();
+    evictCache();
     configEntries.clear();
     endIndex = startIndex - 1;
   }
@@ -390,8 +414,22 @@ public class LogSegment implements Comparable<Long> {
     return loadingTimes.get();
   }
 
-  synchronized void evictCache() {
+  void evictCache() {
     entryCache.clear();
+    totalCacheSize.set(0);
+  }
+
+  void putEntryCache(TermIndex key, LogEntryProto value, Op op) {
+    final LogEntryProto previous = entryCache.put(key, value);
+    Preconditions.assertNull(previous, "entryCache shouldn't contains duplicated entry");
+    totalCacheSize.getAndAdd(getEntrySize(value, op));
+  }
+
+  void removeEntryCache(TermIndex key, Op op) {
+    LogEntryProto value = entryCache.remove(key);
+    if (value != null) {
+      totalCacheSize.getAndAdd(-getEntrySize(value, op));
+    }
   }
 
   boolean hasCache() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1aa9616..c12dd66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -380,7 +380,6 @@ public class SegmentedRaftLog extends RaftLog {
       } else if (isSegmentFull(currentOpenSegment, entry)) {
         cache.rollOpenSegment(true);
         fileLogWorker.rollLogSegment(currentOpenSegment);
-        checkAndEvictCache();
       } else if (currentOpenSegment.numOfEntries() > 0 &&
           currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
         // the term changes
@@ -390,9 +389,11 @@ public class SegmentedRaftLog extends RaftLog {
             currentTerm, entry.getTerm());
         cache.rollOpenSegment(true);
         fileLogWorker.rollLogSegment(currentOpenSegment);
-        checkAndEvictCache();
       }
 
+      //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
+      checkAndEvictCache();
+
       // If the entry has state machine data, then the entry should be inserted
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
@@ -400,9 +401,10 @@ public class SegmentedRaftLog extends RaftLog {
           fileLogWorker.writeLogEntry(entry).getFuture();
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
-        cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry));
+        cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry),
+            LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
       } else {
-        cache.appendEntry(entry);
+        cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       }
       return writeFuture;
     } catch (Exception e) {
@@ -414,14 +416,14 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
-    if (segment.getTotalSize() >= segmentMaxSize) {
+    if (segment.getTotalFileSize() >= segmentMaxSize) {
       return true;
     } else {
-      final long entrySize = LogSegment.getEntrySize(entry);
+      final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.CHECK_SEGMENT_FILE_FULL);
       // if entry size is greater than the max segment size, write it directly
       // into the current segment
       return entrySize <= segmentMaxSize &&
-          segment.getTotalSize() + entrySize > segmentMaxSize;
+          segment.getTotalFileSize() + entrySize > segmentMaxSize;
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 260718f..d32d994 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -167,10 +167,21 @@ public class SegmentedRaftLogCache {
       }
     }
 
-    long sizeInBytes() {
+    long getTotalFileSize() {
       return sizeInBytes;
     }
 
+    long getTotalCacheSize() {
+      try(AutoCloseableLock readLock = readLock()) {
+        long size = 0;
+        // TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache.
+        for (LogSegment seg : segments) {
+          size += seg.getTotalCacheSize();
+        }
+        return size;
+      }
+    }
+
     long countCached() {
       try(AutoCloseableLock readLock = readLock()) {
         return segments.stream().filter(LogSegment::hasCache).count();
@@ -231,7 +242,7 @@ public class SegmentedRaftLogCache {
 
     boolean add(LogSegment logSegment) {
       try(AutoCloseableLock writeLock = writeLock()) {
-        sizeInBytes += logSegment.getTotalSize();
+        sizeInBytes += logSegment.getTotalFileSize();
         return segments.add(logSegment);
       }
     }
@@ -258,9 +269,9 @@ public class SegmentedRaftLogCache {
               openSegment.truncate(index);
               Preconditions.assertTrue(!openSegment.isOpen());
               final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
-                  oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex());
+                  oldEnd, true, openSegment.getTotalFileSize(), openSegment.getEndIndex());
               segments.add(openSegment);
-              sizeInBytes += openSegment.getTotalSize();
+              sizeInBytes += openSegment.getTotalFileSize();
               clearOpenSegment.run();
               return new TruncationSegments(info, Collections.emptyList());
             }
@@ -269,15 +280,15 @@ public class SegmentedRaftLogCache {
           final LogSegment ts = segments.get(segmentIndex);
           final long oldEnd = ts.getEndIndex();
           final List<SegmentFileInfo> list = new ArrayList<>();
-          sizeInBytes -= ts.getTotalSize();
+          sizeInBytes -= ts.getTotalFileSize();
           ts.truncate(index);
-          sizeInBytes += ts.getTotalSize();
+          sizeInBytes += ts.getTotalFileSize();
           final int size = segments.size();
           for(int i = size - 1;
               i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1);
               i--) {
             LogSegment s = segments.remove(i);
-            sizeInBytes -= s.getTotalSize();
+            sizeInBytes -= s.getTotalFileSize();
             final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex();
             s.clear();
             list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, s.getEndIndex()));
@@ -286,7 +297,7 @@ public class SegmentedRaftLogCache {
             list.add(deleteOpenSegment(openSegment, clearOpenSegment));
           }
           SegmentFileInfo t = ts.numOfEntries() == 0? null:
-              new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, ts.getTotalSize(), ts.getEndIndex());
+              new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, ts.getTotalFileSize(), ts.getEndIndex());
           return new TruncationSegments(t, list);
         }
         return null;
@@ -313,7 +324,7 @@ public class SegmentedRaftLogCache {
               segmentIndex : segmentIndex - 1;
           for (int i = startIndex; i >= 0; i--) {
             LogSegment segment = segments.remove(i);
-            sizeInBytes -= segment.getTotalSize();
+            sizeInBytes -= segment.getTotalFileSize();
             list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
           }
         } else {
@@ -342,6 +353,7 @@ public class SegmentedRaftLogCache {
 
   private final int maxCachedSegments;
   private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
+  private final long maxSegmentCacheSize;
 
   SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties,
                                 RaftLogMetrics raftLogMetrics) {
@@ -353,6 +365,7 @@ public class SegmentedRaftLogCache {
     this.raftLogMetrics.addClosedSegmentsSizeInBytes(this);
     this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
     this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
+    this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
   }
 
   int getMaxCachedSegments() {
@@ -373,15 +386,19 @@ public class SegmentedRaftLogCache {
   }
 
   public long getClosedSegmentsSizeInBytes() {
-    return closedSegments.sizeInBytes();
+    return closedSegments.getTotalFileSize();
   }
 
   public long getOpenSegmentSizeInBytes() {
-    return openSegment.getTotalSize();
+    return openSegment.getTotalFileSize();
+  }
+
+  public long getTotalCacheSize() {
+    return closedSegments.getTotalCacheSize() + openSegment.getTotalCacheSize();
   }
 
   boolean shouldEvict() {
-    return closedSegments.countCached() > maxCachedSegments;
+    return closedSegments.countCached() > maxCachedSegments || getTotalCacheSize() > maxSegmentCacheSize;
   }
 
   void evictCache(long[] followerIndices, long safeEvictIndex, long lastAppliedIndex) {
@@ -521,11 +538,11 @@ public class SegmentedRaftLogCache {
             closedSegments.get(closedSegments.size() - 1).getLastTermIndex());
   }
 
-  void appendEntry(LogEntryProto entry) {
+  void appendEntry(LogEntryProto entry, LogSegment.Op op) {
     // SegmentedRaftLog does the segment creation/rolling work. Here we just
     // simply append the entry into the open segment.
     Preconditions.assertTrue(openSegment != null);
-    openSegment.appendToOpenSegment(entry);
+    openSegment.appendToOpenSegment(entry, op);
   }
 
   /**
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index ed365b9..0380cae 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -128,7 +128,7 @@ public class TestLogSegment extends BaseTest {
     Assert.assertEquals(start, segment.getStartIndex());
     Assert.assertEquals(end, segment.getEndIndex());
     Assert.assertEquals(isOpen, segment.isOpen());
-    Assert.assertEquals(totalSize, segment.getTotalSize());
+    Assert.assertEquals(totalSize, segment.getTotalFileSize());
 
     long offset = SegmentedRaftLogFormat.getHeaderLength();
     for (long i = start; i <= end; i++) {
@@ -142,7 +142,7 @@ public class TestLogSegment extends BaseTest {
       if (entry == null) {
         entry = segment.loadCache(record);
       }
-      offset += getEntrySize(entry);
+      offset += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
   }
 
@@ -183,7 +183,7 @@ public class TestLogSegment extends BaseTest {
     LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
         1000, 1099, false, loadInitial, null, null);
     checkLogSegment(closedSegment, 1000, 1099, false,
-        closedSegment.getTotalSize(), 1);
+        closedSegment.getTotalFileSize(), 1);
     Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
   }
 
@@ -201,11 +201,11 @@ public class TestLogSegment extends BaseTest {
     while (size < max) {
       SimpleOperation op = new SimpleOperation("m" + i);
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
-      size += getEntrySize(entry);
-      segment.appendToOpenSegment(entry);
+      size += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
-    Assert.assertTrue(segment.getTotalSize() >= max);
+    Assert.assertTrue(segment.getTotalFileSize() >= max);
     checkLogSegment(segment, start, i - 1 + start, true, size, term);
   }
 
@@ -234,18 +234,18 @@ public class TestLogSegment extends BaseTest {
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001);
-      segment.appendToOpenSegment(entry);
+      segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assert.fail("should fail since the entry's index needs to be 1000");
     } catch (IllegalStateException e) {
       // the exception is expected.
     }
 
     LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000);
-    segment.appendToOpenSegment(entry);
+    segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
 
     try {
       entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002);
-      segment.appendToOpenSegment(entry);
+      segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assert.fail("should fail since the entry's index needs to be 1001");
     } catch (IllegalStateException e) {
       // the exception is expected.
@@ -260,7 +260,7 @@ public class TestLogSegment extends BaseTest {
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
-      segment.appendToOpenSegment(entry);
+      segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
     // truncate an open segment (remove 1080~1099)
@@ -313,7 +313,7 @@ public class TestLogSegment extends BaseTest {
         1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
-      size = LogSegment.getEntrySize(entry);
+      size = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       out.write(entry);
     }
     Assert.assertEquals(file.length(),
@@ -340,7 +340,7 @@ public class TestLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     SimpleOperation op = new SimpleOperation(new String(content));
     LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
-    final long entrySize = LogSegment.getEntrySize(entry);
+    final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
 
     long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index e2cd6acc..4ed18be 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -74,7 +74,7 @@ public class TestSegmentedRaftLog extends BaseTest {
   }
 
   public static long getOpenSegmentSize(RaftLog raftLog) {
-    return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalSize();
+    return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalFileSize();
   }
 
   private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 67775e6..30340ca 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -61,7 +61,7 @@ public class TestSegmentedRaftLogCache {
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
-      s.appendToOpenSegment(entry);
+      s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
     if (!isOpen) {
       s.close();
@@ -152,7 +152,7 @@ public class TestSegmentedRaftLogCache {
     final SimpleOperation m = new SimpleOperation("m");
     try {
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0);
-      cache.appendEntry(entry);
+      cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assert.fail("the open segment is null");
     } catch (IllegalStateException ignored) {
     }
@@ -161,7 +161,7 @@ public class TestSegmentedRaftLogCache {
     cache.addSegment(openSegment);
     for (long index = 101; index < 200; index++) {
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index);
-      cache.appendEntry(entry);
+      cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
     Assert.assertNotNull(cache.getOpenSegment());