You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/04/26 17:56:33 UTC

incubator-ratis git commit: RATIS-76. Add loading policy for RaftLogCache. Contributed by Jing Zhao.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master b2c691382 -> 13fdb9ee6


RATIS-76. Add loading policy for RaftLogCache. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/13fdb9ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/13fdb9ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/13fdb9ee

Branch: refs/heads/master
Commit: 13fdb9ee6cb89a2bfa3533ab7955b33951de491b
Parents: b2c6913
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Apr 26 10:55:00 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Apr 26 10:55:16 2017 -0700

----------------------------------------------------------------------
 .../ratis/grpc/server/GRpcLogAppender.java      |  10 +-
 .../org/apache/ratis/grpc/TestRaftStream.java   |   2 +-
 .../apache/ratis/server/impl/LogAppender.java   |  16 +-
 .../apache/ratis/server/storage/LogSegment.java | 168 ++++++++++++++-----
 .../ratis/server/storage/MemoryRaftLog.java     |   1 -
 .../apache/ratis/server/storage/RaftLog.java    |   2 +-
 .../ratis/server/storage/RaftLogCache.java      |  42 +++--
 .../server/storage/RaftLogIOException.java      |  29 ++++
 .../ratis/server/storage/SegmentedRaftLog.java  |  41 +++--
 .../java/org/apache/ratis/RaftTestUtil.java     |  20 ++-
 .../ratis/server/storage/TestRaftLogCache.java  |  27 +--
 .../server/storage/TestRaftLogSegment.java      |  39 +++--
 .../server/storage/TestSegmentedRaftLog.java    |  21 ++-
 13 files changed, 300 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 656adc2..92dd257 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.grpc.server;
 
+import org.apache.ratis.server.storage.RaftLogIOException;
 import org.apache.ratis.shaded.io.grpc.Status;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
@@ -79,7 +80,12 @@ public class GRpcLogAppender extends LogAppender {
           installSnapshot(snapshot, snapshotResponseHandler);
         } else {
           // keep appending log entries or sending heartbeats
-          appendLog();
+          try {
+            appendLog();
+          } catch (RaftLogIOException e) {
+            LOG.error(this + " hit IOException while loading raft log", e);
+            stopSender();
+          }
         }
       }
 
@@ -107,7 +113,7 @@ public class GRpcLogAppender extends LogAppender {
         shouldWaitForFirstResponse();
   }
 
-  private void appendLog() {
+  private void appendLog() throws RaftLogIOException {
     if (appendLogRequestObserver == null) {
       appendLogRequestObserver = client.appendEntries(appendResponseHandler);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 83ca5ec..1efe4d3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -104,7 +104,7 @@ public class TestRaftStream {
   }
 
   private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
-      Supplier<byte[]> s) {
+      Supplier<byte[]> s) throws IOException {
     long committedIndex = raftLog.getLastCommittedIndex();
     Assert.assertEquals(expectedCommittedIndex, committedIndex);
     // check the log content

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index ff12f4e..a48d236 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -87,6 +88,8 @@ public class LogAppender extends Daemon {
       checkAndSendAppendEntries();
     } catch (InterruptedException | InterruptedIOException e) {
       LOG.info(this + " was interrupted: " + e);
+    } catch (RaftLogIOException e) {
+      LOG.error(this + " hit IOException while loading raft log", e);
     }
   }
 
@@ -150,7 +153,7 @@ public class LogAppender extends Daemon {
     return previous;
   }
 
-  protected AppendEntriesRequestProto createRequest() {
+  protected AppendEntriesRequestProto createRequest() throws RaftLogIOException {
     final TermIndex previous = getPrevious();
     final long leaderNext = raftLog.getNextIndex();
     long next = follower.getNextIndex() + buffer.getPendingEntryNum();
@@ -178,7 +181,7 @@ public class LogAppender extends Daemon {
 
   /** Send an appendEntries RPC; retry indefinitely. */
   private AppendEntriesReplyProto sendAppendEntriesWithRetries()
-      throws InterruptedException, InterruptedIOException {
+      throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
     AppendEntriesRequestProto request = null;
     while (isAppenderRunning()) { // keep retrying for IOException
@@ -202,9 +205,10 @@ public class LogAppender extends Daemon {
         follower.updateLastRpcResponseTime();
 
         return r;
-      } catch (InterruptedIOException iioe) {
-        throw iioe;
+      } catch (InterruptedIOException | RaftLogIOException e) {
+        throw e;
       } catch (IOException ioe) {
+        // TODO should have more detailed retry policy here.
         LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe);
       }
       if (isAppenderRunning()) {
@@ -279,7 +283,7 @@ public class LogAppender extends Daemon {
             InstallSnapshotRequestProto request =
                 server.createInstallSnapshotRequest(follower.getPeer().getId(),
                     requestId, requestIndex++, snapshot,
-                    Arrays.asList(chunk), done);
+                    Collections.singletonList(chunk), done);
             currentOffset += targetLength;
             chunkIndex++;
 
@@ -372,7 +376,7 @@ public class LogAppender extends Daemon {
 
   /** Check and send appendEntries RPC */
   private void checkAndSendAppendEntries()
-      throws InterruptedException, InterruptedIOException {
+      throws InterruptedException, InterruptedIOException, RaftLogIOException {
     while (isAppenderRunning()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 3856585..6c478dd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -17,8 +17,10 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.shaded.com.google.common.cache.CacheLoader;
 import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.FileUtils;
@@ -28,11 +30,15 @@ import org.apache.ratis.util.ProtoUtils;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 /**
@@ -47,7 +53,6 @@ class LogSegment implements Comparable<Long> {
     return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4;
   }
 
-  @VisibleForTesting
   static class LogRecord {
     /** starting offset in the file */
     private final long offset;
@@ -67,44 +72,42 @@ class LogSegment implements Comparable<Long> {
     }
   }
 
-  private boolean isOpen;
-  private long totalSize;
-  private final long startIndex;
-  private long endIndex;
-  /**
-   * the list of records is more like the index of a segment
-   */
-  private final List<LogRecord> records = new ArrayList<>();
-  /**
-   * the entryCache caches the content of log entries.
-   * TODO: currently we cache all the log entries. will fix it soon.
-   */
-  private final Map<TermIndex, LogEntryProto> entryCache = new HashMap<>();
-  private final Set<TermIndex> configEntries = new HashSet<>();
+  static class LogRecordWithEntry {
+    private final LogRecord record;
+    private final LogEntryProto entry;
 
-  private LogSegment(boolean isOpen, long start, long end) {
-    this.isOpen = isOpen;
-    this.startIndex = start;
-    this.endIndex = end;
-    totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+    LogRecordWithEntry(LogRecord record, LogEntryProto entry) {
+      this.record = record;
+      this.entry = entry;
+    }
+
+    LogRecord getRecord() {
+      return record;
+    }
+
+    LogEntryProto getEntry() {
+      return entry;
+    }
+
+    boolean hasEntry() {
+      return entry != null;
+    }
   }
 
-  static LogSegment newOpenSegment(long start) {
+  static LogSegment newOpenSegment(RaftStorage storage, long start) {
     Preconditions.assertTrue(start >= 0);
-    return new LogSegment(true, start, start - 1);
+    return new LogSegment(storage, true, start, start - 1);
   }
 
-  private static LogSegment newCloseSegment(long start, long end) {
+  private static LogSegment newCloseSegment(RaftStorage storage,
+      long start, long end) {
     Preconditions.assertTrue(start >= 0 && end >= start);
-    return new LogSegment(false, start, end);
+    return new LogSegment(storage, false, start, end);
   }
 
-  static LogSegment loadSegment(File file, long start, long end, boolean isOpen,
-      Consumer<LogEntryProto> logConsumer) throws IOException {
-    final LogSegment segment;
+  private static void readSegmentFile(File file, long start, long end,
+      boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException {
     try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
-      segment = isOpen ? LogSegment.newOpenSegment(start) :
-          LogSegment.newCloseSegment(start, end);
       LogEntryProto next;
       LogEntryProto prev = null;
       while ((next = in.nextEntry()) != null) {
@@ -112,14 +115,29 @@ class LogSegment implements Comparable<Long> {
           Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
               "gap between entry %s and entry %s", prev, next);
         }
-        segment.append(next);
 
-        if (logConsumer != null) {
-          logConsumer.accept(next);
+        if (entryConsumer != null) {
+          entryConsumer.accept(next);
         }
         prev = next;
       }
     }
+  }
+
+  static LogSegment loadSegment(RaftStorage storage, File file,
+      long start, long end, boolean isOpen,
+      boolean keptInCache, Consumer<LogEntryProto> logConsumer)
+      throws IOException {
+    final LogSegment segment = isOpen ?
+        LogSegment.newOpenSegment(storage, start) :
+        LogSegment.newCloseSegment(storage, start, end);
+
+    readSegmentFile(file, start, end, isOpen, entry -> {
+      segment.append(keptInCache | isOpen, entry);
+      if (logConsumer != null) {
+        logConsumer.accept(entry);
+      }
+    });
 
     // truncate padding if necessary
     if (file.length() > segment.getTotalSize()) {
@@ -133,6 +151,57 @@ class LogSegment implements Comparable<Long> {
     return segment;
   }
 
+  /**
+   * The current log entry loader simply loads the whole segment into the memory.
+   * In most of the cases this may be good enough considering the main use case
+   * for load log entries is for leader appending to followers.
+   *
+   * In the future we can make the cache loader configurable if necessary.
+   */
+  class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
+    @Override
+    public LogEntryProto load(LogRecord key) throws IOException {
+      final File file = getSegmentFile();
+      readSegmentFile(file, startIndex, endIndex, isOpen,
+          entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
+      loadingTimes.incrementAndGet();
+      return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
+    }
+  }
+
+  private File getSegmentFile() {
+    return isOpen ?
+        storage.getStorageDir().getOpenLogFile(startIndex) :
+        storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
+  }
+
+  private boolean isOpen;
+  private long totalSize;
+  private final long startIndex;
+  private long endIndex;
+  private final RaftStorage storage;
+  private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
+  /** later replace it with a metric */
+  private final AtomicInteger loadingTimes = new AtomicInteger();
+
+  /**
+   * the list of records is more like the index of a segment
+   */
+  private final List<LogRecord> records = new ArrayList<>();
+  /**
+   * the entryCache caches the content of log entries.
+   */
+  private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
+  private final Set<TermIndex> configEntries = new HashSet<>();
+
+  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end) {
+    this.storage = storage;
+    this.isOpen = isOpen;
+    this.startIndex = start;
+    this.endIndex = end;
+    totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+  }
+
   long getStartIndex() {
     return startIndex;
   }
@@ -152,10 +221,10 @@ class LogSegment implements Comparable<Long> {
   void appendToOpenSegment(LogEntryProto... entries) {
     Preconditions.assertTrue(isOpen(),
         "The log segment %s is not open for append", this.toString());
-    append(entries);
+    append(true, entries);
   }
 
-  private void append(LogEntryProto... entries) {
+  private void append(boolean keptInCache, LogEntryProto... entries) {
     Preconditions.assertTrue(entries != null && entries.length > 0);
     final long term = entries[0].getTerm();
     if (records.isEmpty()) {
@@ -177,7 +246,9 @@ class LogSegment implements Comparable<Long> {
 
       final LogRecord record = new LogRecord(totalSize, entry);
       records.add(record);
-      entryCache.put(record.getTermIndex(), entry);
+      if (keptInCache) {
+        entryCache.put(record.getTermIndex(), entry);
+      }
       if (ProtoUtils.isConfigurationLogEntry(entry)) {
         configEntries.add(record.getTermIndex());
       }
@@ -186,14 +257,27 @@ class LogSegment implements Comparable<Long> {
     }
   }
 
-  LogEntryProto getLogEntry(long index) {
+  LogRecordWithEntry getEntryWithoutLoading(long index) {
     LogRecord record = getLogRecord(index);
-    return record == null ? null : entryCache.get(record.getTermIndex());
+    if (record == null) {
+      return null;
+    }
+    return new LogRecordWithEntry(record, entryCache.get(record.getTermIndex()));
   }
 
-  TermIndex getTermIndex(long index) {
-    LogRecord record = getLogRecord(index);
-    return record == null ? null : record.getTermIndex();
+  /**
+   * Acquire LogSegment's monitor so that there is no concurrent loading.
+   */
+  synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException {
+    LogEntryProto entry = entryCache.get(record.getTermIndex());
+    if (entry != null) {
+      return entry;
+    }
+    try {
+      return cacheLoader.load(record);
+    } catch (Exception e) {
+      throw new RaftLogIOException(e);
+    }
   }
 
   LogRecord getLogRecord(long index) {
@@ -259,4 +343,8 @@ class LogSegment implements Comparable<Long> {
     configEntries.clear();
     endIndex = startIndex - 1;
   }
+
+  public int getLoadingTimes() {
+    return loadingTimes.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index df71d08..ecbae2e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -154,7 +154,6 @@ public class MemoryRaftLog extends RaftLog {
       if (toTruncate) {
         truncate(truncateIndex);
       }
-      //  Collections.addAll(this.entries, entries);
       for (int i = index; i < entries.length; i++) {
         this.entries.add(entries[i]);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 5002f83..4d84a57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -179,7 +179,7 @@ public abstract class RaftLog implements Closeable {
    * @return The log entry associated with the given index.
    *         Null if there is no log entry with the index.
    */
-  public abstract LogEntryProto get(long index);
+  public abstract LogEntryProto get(long index) throws RaftLogIOException;
 
   /**
    * Get the TermIndex information of the given index.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 0142bee..7f13dd8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 
 import org.apache.ratis.util.Preconditions;
@@ -68,8 +69,10 @@ class RaftLogCache {
 
   private LogSegment openSegment;
   private final List<LogSegment> closedSegments;
+  private final RaftStorage storage;
 
-  RaftLogCache() {
+  RaftLogCache(RaftStorage storage) {
+    this.storage = storage;
     closedSegments = new ArrayList<>();
   }
 
@@ -116,13 +119,13 @@ class RaftLogCache {
     openSegment.close();
     closedSegments.add(openSegment);
     if (createNewOpen) {
-      openSegment = LogSegment.newOpenSegment(nextIndex);
+      openSegment = LogSegment.newOpenSegment(storage, nextIndex);
     } else {
       openSegment = null;
     }
   }
 
-  private LogSegment getSegment(long index) {
+  LogSegment getSegment(long index) {
     if (openSegment != null && index >= openSegment.getStartIndex()) {
       return openSegment;
     } else {
@@ -131,21 +134,16 @@ class RaftLogCache {
     }
   }
 
-  LogEntryProto getEntry(long index) {
+  LogRecord getLogRecord(long index) {
     LogSegment segment = getSegment(index);
-    return segment == null ? null : segment.getLogEntry(index);
-  }
-
-  TermIndex getTermIndex(long index) {
-    LogSegment segment = getSegment(index);
-    return segment == null ? null : segment.getTermIndex(index);
+    return segment == null ? null : segment.getLogRecord(index);
   }
 
   /**
    * @param startIndex inclusive
    * @param endIndex exclusive
    */
-  TermIndex[] getEntries(final long startIndex, final long endIndex) {
+  TermIndex[] getTermIndices(final long startIndex, final long endIndex) {
     if (startIndex < 0 || startIndex < getStartIndex()) {
       throw new IndexOutOfBoundsException("startIndex = " + startIndex
           + ", log cache starts from index " + getStartIndex());
@@ -162,19 +160,19 @@ class RaftLogCache {
     TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
     int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
     if (segmentIndex < 0) {
-      getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length);
+      getFromSegment(openSegment, startIndex, entries, 0, entries.length);
     } else {
       long index = startIndex;
       for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) {
         LogSegment s = closedSegments.get(i);
         int numberFromSegment = Math.toIntExact(
             Math.min(realEnd - index, s.getEndIndex() - index + 1));
-        getEntriesFromSegment(s, index, entries,
+        getFromSegment(s, index, entries,
             Math.toIntExact(index - startIndex), numberFromSegment);
         index += numberFromSegment;
       }
       if (index < realEnd) {
-        getEntriesFromSegment(openSegment, index, entries,
+        getFromSegment(openSegment, index, entries,
             Math.toIntExact(index - startIndex),
             Math.toIntExact(realEnd - index));
       }
@@ -182,13 +180,13 @@ class RaftLogCache {
     return entries;
   }
 
-  private void getEntriesFromSegment(LogSegment segment, long startIndex,
+  private void getFromSegment(LogSegment segment, long startIndex,
       TermIndex[] entries, int offset, int size) {
     long endIndex = segment.getEndIndex();
     endIndex = Math.min(endIndex, startIndex + size - 1);
     int index = offset;
     for (long i = startIndex; i <= endIndex; i++) {
-      LogSegment.LogRecord r = segment.getLogRecord(i);
+      LogRecord r = segment.getLogRecord(i);
       entries[index++] = r == null ? null : r.getTermIndex();
     }
   }
@@ -286,11 +284,11 @@ class RaftLogCache {
     return null;
   }
 
-  Iterator<LogEntryProto> iterator(long startIndex) {
+  Iterator<TermIndex> iterator(long startIndex) {
     return new EntryIterator(startIndex);
   }
 
-  private class EntryIterator implements Iterator<LogEntryProto> {
+  private class EntryIterator implements Iterator<TermIndex> {
     private long nextIndex;
     private LogSegment currentSegment;
     private int segmentIndex;
@@ -321,10 +319,10 @@ class RaftLogCache {
     }
 
     @Override
-    public LogEntryProto next() {
-      LogEntryProto entry;
+    public TermIndex next() {
+      LogRecord record;
       if (currentSegment == null ||
-          (entry = currentSegment.getLogEntry(nextIndex)) == null) {
+          (record = currentSegment.getLogRecord(nextIndex)) == null) {
         throw new NoSuchElementException();
       }
       if (++nextIndex > currentSegment.getEndIndex()) {
@@ -334,7 +332,7 @@ class RaftLogCache {
               openSegment : closedSegments.get(segmentIndex);
         }
       }
-      return entry;
+      return record.getTermIndex();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
new file mode 100644
index 0000000..b9fb5ff
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.protocol.RaftException;
+
+/**
+ * Exception while reading/writing RaftLog
+ */
+public class RaftLogIOException extends RaftException {
+  public RaftLogIOException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index c956f84..8eaa0ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -23,6 +23,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -105,7 +107,7 @@ public class SegmentedRaftLog extends RaftLog {
     super(selfId);
     this.storage = storage;
     this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    cache = new RaftLogCache();
+    cache = new RaftLogCache(storage);
     fileLogWorker = new RaftLogWorker(server, storage, properties);
     lastCommitted.set(lastIndexInSnapshot);
   }
@@ -136,8 +138,8 @@ public class SegmentedRaftLog extends RaftLog {
       List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
       for (LogPathAndIndex pi : paths) {
         boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
-        LogSegment logSegment = LogSegment.loadSegment(pi.path.toFile(),
-            pi.startIndex, pi.endIndex, isOpen, logConsumer);
+        LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
+            pi.startIndex, pi.endIndex, isOpen, true, logConsumer);
         cache.addSegment(logSegment);
       }
 
@@ -155,18 +157,35 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto get(long index) {
+  public LogEntryProto get(long index) throws RaftLogIOException {
     checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getEntry(index);
+    LogSegment segment;
+    LogRecordWithEntry recordAndEntry;
+    try (AutoCloseableLock readLock = readLock()) {
+      segment = cache.getSegment(index);
+      if (segment == null) {
+        return null;
+      }
+      recordAndEntry = segment.getEntryWithoutLoading(index);
+      if (recordAndEntry == null) {
+        return null;
+      }
+      if (recordAndEntry.hasEntry()) {
+        return recordAndEntry.getEntry();
+      }
     }
+
+    // the entry is not in the segment's cache. Load the cache without holding
+    // RaftLog's lock.
+    return segment.loadCache(recordAndEntry.getRecord());
   }
 
   @Override
   public TermIndex getTermIndex(long index) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
-      return cache.getTermIndex(index);
+      LogRecord record = cache.getLogRecord(index);
+      return record != null ? record.getTermIndex() : null;
     }
   }
 
@@ -174,7 +193,7 @@ public class SegmentedRaftLog extends RaftLog {
   public TermIndex[] getEntries(long startIndex, long endIndex) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
-      return cache.getEntries(startIndex, endIndex);
+      return cache.getTermIndices(startIndex, endIndex);
     }
   }
 
@@ -208,7 +227,7 @@ public class SegmentedRaftLog extends RaftLog {
     try(AutoCloseableLock writeLock = writeLock()) {
       final LogSegment currentOpenSegment = cache.getOpenSegment();
       if (currentOpenSegment == null) {
-        cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
+        cache.addSegment(LogSegment.newOpenSegment(storage, entry.getIndex()));
         fileLogWorker.startLogSegment(getNextIndex());
       } else if (isSegmentFull(currentOpenSegment, entry)) {
         cache.rollOpenSegment(true);
@@ -248,11 +267,11 @@ public class SegmentedRaftLog extends RaftLog {
       return;
     }
     try(AutoCloseableLock writeLock = writeLock()) {
-      Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
+      Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex());
       int index = 0;
       long truncateIndex = -1;
       for (; iter.hasNext() && index < entries.length; index++) {
-        LogEntryProto storedEntry = iter.next();
+        TermIndex storedEntry = iter.next();
         Preconditions.assertTrue(
             storedEntry.getIndex() == entries[index].getIndex(),
             "The stored entry's index %s is not consistent with" +

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 76258e5..0680df9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -101,10 +101,13 @@ public class RaftTestUtil {
     TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
     while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
-      if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
-          log.get(termIndices[idxEntries].getIndex()).getSmLogEntry()
-              .getData().toByteArray())) {
-        ++idxExpected;
+      try {
+        if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
+            log.get(termIndices[idxEntries].getIndex()).getSmLogEntry().getData().toByteArray())) {
+          ++idxExpected;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
       ++idxEntries;
     }
@@ -112,7 +115,7 @@ public class RaftTestUtil {
   }
 
   public static void assertLogEntries(Collection<RaftServerImpl> servers,
-                                      SimpleMessage... expectedMessages) {
+      SimpleMessage... expectedMessages) {
     final int size = servers.size();
     final long count = servers.stream()
         .filter(RaftServerImpl::isAlive)
@@ -129,7 +132,12 @@ public class RaftTestUtil {
       long startIndex, long expertedTerm, SimpleMessage... expectedMessages) {
     Assert.assertEquals(expectedMessages.length, entries.length);
     for(int i = 0; i < entries.length; i++) {
-      final LogEntryProto e = log.get(entries[i].getIndex());
+      final LogEntryProto e;
+      try {
+        e = log.get(entries[i].getIndex());
+      } catch (IOException exception) {
+        throw new RuntimeException(exception);
+      }
       Assert.assertEquals(expertedTerm, e.getTerm());
       Assert.assertEquals(startIndex + i, e.getIndex());
       Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index d9f7c10..38c879b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.storage;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
@@ -37,11 +38,11 @@ public class TestRaftLogCache {
 
   @Before
   public void setup() {
-    cache = new RaftLogCache();
+    cache = new RaftLogCache(null);
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
-    LogSegment s = LogSegment.newOpenSegment(start);
+    LogSegment s = LogSegment.newOpenSegment(null, start);
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
@@ -54,12 +55,12 @@ public class TestRaftLogCache {
     return s;
   }
 
-  private void checkCache(long start, long end, int segmentSize) {
+  private void checkCache(long start, long end, int segmentSize) throws IOException {
     Assert.assertEquals(start, cache.getStartIndex());
     Assert.assertEquals(end, cache.getEndIndex());
 
     for (long index = start; index <= end; index++) {
-      LogEntryProto entry = cache.getEntry(index);
+      LogEntryProto entry = cache.getSegment(index).getEntryWithoutLoading(index).getEntry();
       Assert.assertEquals(index, entry.getIndex());
     }
 
@@ -75,7 +76,7 @@ public class TestRaftLogCache {
   }
 
   private void checkCacheEntries(long offset, int size, long end) {
-    TermIndex[] entries = cache.getEntries(offset, offset + size);
+    TermIndex[] entries = cache.getTermIndices(offset, offset + size);
     long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
     Assert.assertEquals(realEnd - offset, entries.length);
     for (long i = offset; i < realEnd; i++) {
@@ -220,16 +221,16 @@ public class TestRaftLogCache {
     Assert.assertEquals(249, ts.toTruncate.endIndex);
   }
 
-  private void testIterator(long startIndex) {
-    Iterator<LogEntryProto> iterator = cache.iterator(startIndex);
-    LogEntryProto prev = null;
+  private void testIterator(long startIndex) throws IOException {
+    Iterator<TermIndex> iterator = cache.iterator(startIndex);
+    TermIndex prev = null;
     while (iterator.hasNext()) {
-      LogEntryProto entry = iterator.next();
-      Assert.assertEquals(cache.getEntry(entry.getIndex()), entry);
+      TermIndex termIndex = iterator.next();
+      Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
       if (prev != null) {
-        Assert.assertEquals(prev.getIndex() + 1, entry.getIndex());
+        Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
       }
-      prev = entry;
+      prev = termIndex;
     }
     if (startIndex <= cache.getEndIndex()) {
       Assert.assertNotNull(prev);
@@ -254,7 +255,7 @@ public class TestRaftLogCache {
     }
     testIterator(299);
 
-    Iterator<LogEntryProto> iterator = cache.iterator(300);
+    Iterator<TermIndex> iterator = cache.iterator(300);
     Assert.assertFalse(iterator.hasNext());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 4e90d75..73709fc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -86,7 +87,7 @@ public class TestRaftLogSegment {
   }
 
   private void checkLogSegment(LogSegment segment, long start, long end,
-      boolean isOpen, long totalSize, long term) {
+      boolean isOpen, long totalSize, long term) throws Exception {
     Assert.assertEquals(start, segment.getStartIndex());
     Assert.assertEquals(end, segment.getEndIndex());
     Assert.assertEquals(isOpen, segment.isOpen());
@@ -95,35 +96,51 @@ public class TestRaftLogSegment {
     long offset = SegmentedRaftLog.HEADER_BYTES.length;
     for (long i = start; i <= end; i++) {
       LogSegment.LogRecord record = segment.getLogRecord(i);
-      LogEntryProto entry = segment.getLogEntry(i);
-      Assert.assertEquals(i, entry.getIndex());
-      Assert.assertEquals(term, entry.getTerm());
+      LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
+      Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex());
+      Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm());
       Assert.assertEquals(offset, record.getOffset());
 
+      LogEntryProto entry = lre.hasEntry() ?
+          lre.getEntry() : segment.loadCache(lre.getRecord());
       offset += getEntrySize(entry);
     }
   }
 
   @Test
   public void testLoadLogSegment() throws Exception {
+    testLoadSegment(true);
+  }
+
+  @Test
+  public void testLoadCache() throws Exception {
+    testLoadSegment(false);
+  }
+
+  private void testLoadSegment(boolean loadInitial) throws Exception {
     // load an open segment
     File openSegmentFile = prepareLog(true, 0, 100, 0);
-    LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0,
-        INVALID_LOG_INDEX, true, null);
+    RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+    LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
+        INVALID_LOG_INDEX, true, loadInitial, null);
     checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0);
+    storage.close();
+    // for open segment we currently always keep log entries in the memory
+    Assert.assertEquals(0, openSegment.getLoadingTimes());
 
     // load a closed segment (1000-1099)
     File closedSegmentFile = prepareLog(false, 1000, 100, 1);
-    LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000,
-        1099, false, null);
+    LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
+        1000, 1099, false, loadInitial, null);
     checkLogSegment(closedSegment, 1000, 1099, false,
         closedSegment.getTotalSize(), 1);
+    Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
   }
 
   @Test
   public void testAppendEntries() throws Exception {
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(start);
+    LogSegment segment = LogSegment.newOpenSegment(null, start);
     long size = SegmentedRaftLog.HEADER_BYTES.length;
     final long max = 8 * 1024 * 1024;
     checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -147,7 +164,7 @@ public class TestRaftLogSegment {
 
   @Test
   public void testAppendWithGap() throws Exception {
-    LogSegment segment = LogSegment.newOpenSegment(1000);
+    LogSegment segment = LogSegment.newOpenSegment(null, 1000);
     SimpleOperation op = new SimpleOperation("m");
     final SMLogEntryProto m = op.getLogEntryContent();
     try {
@@ -185,7 +202,7 @@ public class TestRaftLogSegment {
   public void testTruncate() throws Exception {
     final long term = 1;
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(start);
+    LogSegment segment = LogSegment.newOpenSegment(null, start);
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = ProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index afd7d29..1d38971 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -120,7 +120,8 @@ public class TestSegmentedRaftLog {
     return list;
   }
 
-  private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) {
+  private LogEntryProto getLastEntry(SegmentedRaftLog raftLog)
+      throws IOException {
     return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
   }
 
@@ -142,7 +143,13 @@ public class TestSegmentedRaftLog {
 
       TermIndex[] termIndices = raftLog.getEntries(0, 500);
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
-          .map(ti -> raftLog.get(ti.getIndex()))
+          .map(ti -> {
+            try {
+              return raftLog.get(ti.getIndex());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
           .collect(Collectors.toList())
           .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
       Assert.assertArrayEquals(entries, entriesFromLog);
@@ -265,7 +272,7 @@ public class TestSegmentedRaftLog {
   }
 
   private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
-      int offset, int size) {
+      int offset, int size) throws IOException {
     if (size > 0) {
       for (int i = offset; i < size + offset; i++) {
         LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
@@ -275,7 +282,13 @@ public class TestSegmentedRaftLog {
           expected.get(offset).getIndex(),
           expected.get(offset + size - 1).getIndex() + 1);
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
-          .map(ti -> raftLog.get(ti.getIndex()))
+          .map(ti -> {
+            try {
+              return raftLog.get(ti.getIndex());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
           .collect(Collectors.toList())
           .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
       LogEntryProto[] expectedArray = expected.subList(offset, offset + size)