You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/05/10 17:57:28 UTC

incubator-ratis git commit: RATIS-82. Add cache eviction policy. Contributed by Jing Zhao.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 6eb4f8278 -> 2fbbe0aa9


RATIS-82. Add cache eviction policy. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2fbbe0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2fbbe0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2fbbe0aa

Branch: refs/heads/master
Commit: 2fbbe0aa9df31e5e982b235071e0b438f0241453
Parents: 6eb4f82
Author: Jing Zhao <ji...@apache.org>
Authored: Wed May 10 10:57:20 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed May 10 10:57:20 2017 -0700

----------------------------------------------------------------------
 .../ratis/hadooprpc/TestRaftWithHadoopRpc.java  |   2 +-
 .../ratis/server/RaftServerConfigKeys.java      |  11 +-
 .../apache/ratis/server/impl/LeaderState.java   |   4 +
 .../ratis/server/impl/RaftServerImpl.java       |   8 +
 .../ratis/server/impl/ServerProtoUtils.java     |   4 +-
 .../server/storage/CacheInvalidationPolicy.java | 112 ++++++++++
 .../apache/ratis/server/storage/LogSegment.java |  39 +++-
 .../ratis/server/storage/RaftLogCache.java      |  50 ++++-
 .../ratis/server/storage/SegmentedRaftLog.java  |  34 +++-
 .../ratis/server/storage/TestCacheEviction.java | 202 +++++++++++++++++++
 .../ratis/server/storage/TestRaftLogCache.java  |   4 +-
 .../server/storage/TestSegmentedRaftLog.java    |   4 +-
 12 files changed, 450 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
index 124e7ee..0519e52 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
@@ -31,7 +31,7 @@ import static org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServer
 
 public class TestRaftWithHadoopRpc extends RaftBasicTests {
   static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
     LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index be0c6bc..abab5af 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -23,7 +23,6 @@ import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
 
 import static org.apache.ratis.conf.ConfUtils.*;
 
@@ -76,6 +75,16 @@ public interface RaftServerConfigKeys {
       setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax);
     }
 
+    /**
+     * Besides the open segment, the max number of segments caching log entries.
+     */
+    String SEGMENT_CACHE_MAX_NUM_KEY = PREFIX + ".segment.cache.num.max";
+    int SEGMENT_CACHE_MAX_NUM_DEFAULT = 6;
+    static int maxCachedSegmentNum(RaftProperties properties) {
+      return getInt(properties::getInt, SEGMENT_CACHE_MAX_NUM_KEY,
+          SEGMENT_CACHE_MAX_NUM_DEFAULT, requireMin(0));
+    }
+
     String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size";
     SizeInBytes PREALLOCATED_SIZE_DEFAULT = SizeInBytes.valueOf("4MB");
     static SizeInBytes preallocatedSize(RaftProperties properties) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 96bd28b..c4f92db 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -523,6 +523,10 @@ public class LeaderState {
     return pendingRequests.getTransactionContext(index);
   }
 
+  long[] getFollowerNextIndices() {
+    return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
+  }
+
   private class ConfigurationStagingState {
     private final Map<RaftPeerId, RaftPeer> newPeers;
     private final PeerConfiguration newConf;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3c704b0..dca8b08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -913,6 +913,14 @@ public class RaftServerImpl implements RaftServer {
     return null;
   }
 
+  public synchronized long[] getFollowerNextIndices() {
+    LeaderState s = this.leaderState;
+    if (s == null || !isLeader()) {
+      return null;
+    }
+    return s.getFollowerNextIndices();
+  }
+
   public void applyLogToStateMachine(LogEntryProto next) {
     if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
       // the reply should have already been set. only need to record

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 945be8d..6705e91 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -75,8 +75,8 @@ public class ServerProtoUtils {
   }
 
   private static String toString(RaftRpcReplyProto reply) {
-    return reply.getRequestorId() + "->" + reply.getReplyId() + ","
-        + reply.getSuccess();
+    return reply.getRequestorId().toStringUtf8() + "->"
+        + reply.getReplyId().toString() + "," + reply.getSuccess();
   }
 
   public static RaftConfigurationProto toRaftConfigurationProto(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
new file mode 100644
index 0000000..12534cf
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public interface CacheInvalidationPolicy {
+  /**
+   * Determine which log segments should evict their log entry cache
+   * @param followerNextIndices the next indices of all the follower peers. Null
+   *                            if the local peer is not a leader.
+   * @param localFlushedIndex the index that has been flushed to the local disk.
+   * @param lastAppliedIndex the last index that has been applied to state machine
+   * @param segments The list of log segments. The segments should be sorted in
+   *                 ascending order according to log index.
+   * @param maxCachedSegments the max number of segments with cached log entries
+   * @return the log segments that should evict cache
+   */
+  List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex,
+      long lastAppliedIndex, List<LogSegment> segments, int maxCachedSegments);
+
+  class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy {
+    @Override
+    public List<LogSegment> evict(long[] followerNextIndices,
+        long localFlushedIndex, long lastAppliedIndex,
+        List<LogSegment> segments, final int maxCachedSegments) {
+      List<LogSegment> result = new ArrayList<>();
+      int safeIndex = segments.size() - 1;
+      for (; safeIndex >= 0; safeIndex--) {
+        LogSegment segment = segments.get(safeIndex);
+        // a segment's cache can be invalidated only if it's close and all its
+        // entries have been flushed to the local disk
+        if (!segment.isOpen() && segment.getEndIndex() <= localFlushedIndex) {
+          break;
+        }
+      }
+      if (followerNextIndices == null || followerNextIndices.length == 0) {
+        // no followers, determine the eviction based on lastAppliedIndex
+        // first scan from the oldest segment to the one that is right before
+        // lastAppliedIndex. All these segment's cache can be invalidated.
+        int j = 0;
+        for (; j <= safeIndex; j++) {
+          LogSegment segment = segments.get(j);
+          if (segment.getEndIndex() > lastAppliedIndex) {
+            break;
+          }
+          if (segment.hasCache()) {
+            result.add(segment);
+          }
+        }
+        // if there is no cache invalidation target found, pick a segment that
+        // later (but not now) the state machine will consume
+        if (result.isEmpty()) {
+          for (int i = safeIndex; i >= j; i--) {
+            LogSegment s = segments.get(i);
+            if (s.getStartIndex() > lastAppliedIndex && s.hasCache()) {
+              result.add(s);
+              break;
+            }
+          }
+        }
+      } else {
+        // this peer is the leader with followers. determine the eviction based
+        // on followers' next indices and the local lastAppliedIndex.
+        Arrays.sort(followerNextIndices);
+        // segments covering index minToRead will still be loaded. Thus we first
+        // try to evict cache for segments before minToRead.
+        final long minToRead = Math.min(followerNextIndices[0], lastAppliedIndex);
+        int j = 0;
+        for (; j <= safeIndex; j++) {
+          LogSegment s = segments.get(j);
+          if (s.getEndIndex() >= minToRead) {
+            break;
+          }
+          if (s.hasCache()) {
+            result.add(s);
+          }
+        }
+        // if there is no eviction target, continue the scanning and evict
+        // the one that is not being read currently.
+        if (result.isEmpty()) {
+          for (; j <= safeIndex; j++) {
+            LogSegment s = segments.get(j);
+            if (Arrays.stream(followerNextIndices).noneMatch(s::containsIndex)
+                && !s.containsIndex(lastAppliedIndex) && s.hasCache()) {
+              result.add(s);
+              break;
+            }
+          }
+        }
+      }
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 6c478dd..ff7f353 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -30,8 +30,6 @@ import org.apache.ratis.util.ProtoUtils;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -99,7 +97,8 @@ class LogSegment implements Comparable<Long> {
     return new LogSegment(storage, true, start, start - 1);
   }
 
-  private static LogSegment newCloseSegment(RaftStorage storage,
+  @VisibleForTesting
+  static LogSegment newCloseSegment(RaftStorage storage,
       long start, long end) {
     Preconditions.assertTrue(start >= 0 && end >= start);
     return new LogSegment(storage, false, start, end);
@@ -126,14 +125,14 @@ class LogSegment implements Comparable<Long> {
 
   static LogSegment loadSegment(RaftStorage storage, File file,
       long start, long end, boolean isOpen,
-      boolean keptInCache, Consumer<LogEntryProto> logConsumer)
+      boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
       throws IOException {
     final LogSegment segment = isOpen ?
         LogSegment.newOpenSegment(storage, start) :
         LogSegment.newCloseSegment(storage, start, end);
 
     readSegmentFile(file, start, end, isOpen, entry -> {
-      segment.append(keptInCache | isOpen, entry);
+      segment.append(keepEntryInCache | isOpen, entry);
       if (logConsumer != null) {
         logConsumer.accept(entry);
       }
@@ -162,6 +161,8 @@ class LogSegment implements Comparable<Long> {
     @Override
     public LogEntryProto load(LogRecord key) throws IOException {
       final File file = getSegmentFile();
+      // note the loading should not exceed the endIndex: it is possible that
+      // the on-disk log file should be truncated but has not been done yet.
       readSegmentFile(file, startIndex, endIndex, isOpen,
           entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
       loadingTimes.incrementAndGet();
@@ -175,14 +176,15 @@ class LogSegment implements Comparable<Long> {
         storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
   }
 
-  private boolean isOpen;
+  private volatile boolean isOpen;
   private long totalSize;
   private final long startIndex;
-  private long endIndex;
+  private volatile long endIndex;
   private final RaftStorage storage;
   private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
+  private volatile boolean hasEntryCache;
 
   /**
    * the list of records is more like the index of a segment
@@ -200,6 +202,7 @@ class LogSegment implements Comparable<Long> {
     this.startIndex = start;
     this.endIndex = end;
     totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+    hasEntryCache = isOpen;
   }
 
   long getStartIndex() {
@@ -224,7 +227,7 @@ class LogSegment implements Comparable<Long> {
     append(true, entries);
   }
 
-  private void append(boolean keptInCache, LogEntryProto... entries) {
+  private void append(boolean keepEntryInCache, LogEntryProto... entries) {
     Preconditions.assertTrue(entries != null && entries.length > 0);
     final long term = entries[0].getTerm();
     if (records.isEmpty()) {
@@ -246,7 +249,7 @@ class LogSegment implements Comparable<Long> {
 
       final LogRecord record = new LogRecord(totalSize, entry);
       records.add(record);
-      if (keptInCache) {
+      if (keepEntryInCache) {
         entryCache.put(record.getTermIndex(), entry);
       }
       if (ProtoUtils.isConfigurationLogEntry(entry)) {
@@ -274,7 +277,9 @@ class LogSegment implements Comparable<Long> {
       return entry;
     }
     try {
-      return cacheLoader.load(record);
+      entry = cacheLoader.load(record);
+      hasEntryCache = true;
+      return entry;
     } catch (Exception e) {
       throw new RaftLogIOException(e);
     }
@@ -340,6 +345,7 @@ class LogSegment implements Comparable<Long> {
   void clear() {
     records.clear();
     entryCache.clear();
+    hasEntryCache = false;
     configEntries.clear();
     endIndex = startIndex - 1;
   }
@@ -347,4 +353,17 @@ class LogSegment implements Comparable<Long> {
   public int getLoadingTimes() {
     return loadingTimes.get();
   }
+
+  void evictCache() {
+    hasEntryCache = false;
+    entryCache.clear();
+  }
+
+  boolean hasCache() {
+    return hasEntryCache;
+  }
+
+  boolean containsIndex(long index) {
+    return startIndex <= index && endIndex >= index;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 7f13dd8..5863f8d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -19,15 +19,21 @@ package org.apache.ratis.server.storage;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.function.Consumer;
 
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 
 import org.apache.ratis.util.Preconditions;
@@ -65,17 +71,53 @@ class RaftLogCache {
           toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
       this.toTruncate = toTruncate;
     }
+
+    int getDeletionSize() {
+      return toDelete == null ? 0 : toDelete.length;
+    }
   }
 
   private LogSegment openSegment;
   private final List<LogSegment> closedSegments;
   private final RaftStorage storage;
 
-  RaftLogCache(RaftStorage storage) {
+  private final int maxCachedSegments;
+  private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
+
+  RaftLogCache(RaftStorage storage, RaftProperties properties) {
     this.storage = storage;
+    maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
     closedSegments = new ArrayList<>();
   }
 
+  int getMaxCachedSegments() {
+    return maxCachedSegments;
+  }
+
+  void loadSegment(LogPathAndIndex pi, boolean isOpen, boolean keepEntryInCache,
+      Consumer<LogEntryProto> logConsumer) throws IOException {
+    LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
+        pi.startIndex, pi.endIndex, isOpen, keepEntryInCache, logConsumer);
+    addSegment(logSegment);
+  }
+
+  long getCachedSegmentNum() {
+    return closedSegments.stream().filter(LogSegment::hasCache).count();
+  }
+
+  boolean shouldEvict() {
+    return getCachedSegmentNum() > maxCachedSegments;
+  }
+
+  void evictCache(long[] followerIndices, long flushedIndex,
+      long lastAppliedIndex) {
+    List<LogSegment> toEvict = evictionPolicy.evict(followerIndices,
+        flushedIndex, lastAppliedIndex, closedSegments, maxCachedSegments);
+    for (LogSegment s : toEvict) {
+      s.evictCache();
+    }
+  }
+
   private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) {
     return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex();
   }
@@ -345,7 +387,11 @@ class RaftLogCache {
   }
 
   void clear() {
-    openSegment = null;
+    if (openSegment != null) {
+      openSegment.clear();
+      openSegment = null;
+    }
+    closedSegments.forEach(LogSegment::clear);
     closedSegments.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 8eaa0ab..0c54af7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.CodeInjectionForTesting;
@@ -67,6 +68,7 @@ import java.util.function.Consumer;
 public class SegmentedRaftLog extends RaftLog {
   static final String HEADER_STR = "RAFTLOG1";
   static final byte[] HEADER_BYTES = HEADER_STR.getBytes(StandardCharsets.UTF_8);
+  static final LogSegment[] EMPTY_SEGMENT_ARRAY = new LogSegment[0];
 
   /**
    * I/O task definitions.
@@ -96,6 +98,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
   private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
 
+  private final RaftServerImpl server;
   private final RaftStorage storage;
   private final RaftLogCache cache;
   private final RaftLogWorker fileLogWorker;
@@ -105,9 +108,10 @@ public class SegmentedRaftLog extends RaftLog {
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties)
       throws IOException {
     super(selfId);
+    this.server = server;
     this.storage = storage;
-    this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    cache = new RaftLogCache(storage);
+    segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    cache = new RaftLogCache(storage, properties);
     fileLogWorker = new RaftLogWorker(server, storage, properties);
     lastCommitted.set(lastIndexInSnapshot);
   }
@@ -136,11 +140,18 @@ public class SegmentedRaftLog extends RaftLog {
       Consumer<LogEntryProto> logConsumer) throws IOException {
     try(AutoCloseableLock writeLock = writeLock()) {
       List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
+      int i = 0;
       for (LogPathAndIndex pi : paths) {
         boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
-        LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
-            pi.startIndex, pi.endIndex, isOpen, true, logConsumer);
-        cache.addSegment(logSegment);
+        // During the initial loading, we can only confirm the committed
+        // index based on the snapshot. This means if a log segment is not kept
+        // in cache after the initial loading, later we have to load its content
+        // again for updating the state machine.
+        // TODO we should let raft peer persist its committed index periodically
+        // so that during the initial loading we can apply part of the log
+        // entries to the state machine
+        boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
+        cache.loadSegment(pi, isOpen, keepEntryInCache, logConsumer);
       }
 
       // if the largest index is smaller than the last index in snapshot, we do
@@ -177,9 +188,21 @@ public class SegmentedRaftLog extends RaftLog {
 
     // the entry is not in the segment's cache. Load the cache without holding
     // RaftLog's lock.
+    checkAndEvictCache();
     return segment.loadCache(recordAndEntry.getRecord());
   }
 
+  private void checkAndEvictCache() {
+    if (server != null && cache.shouldEvict()) {
+      // TODO if the cache is hitting the maximum size and we cannot evict any
+      // segment's cache, should block the new entry appending or new segment
+      // allocation.
+      cache.evictCache(server.getFollowerNextIndices(),
+          fileLogWorker.getFlushedIndex(),
+          server.getState().getLastAppliedIndex());
+    }
+  }
+
   @Override
   public TermIndex getTermIndex(long index) {
     checkLogState();
@@ -232,6 +255,7 @@ public class SegmentedRaftLog extends RaftLog {
       } else if (isSegmentFull(currentOpenSegment, entry)) {
         cache.rollOpenSegment(true);
         fileLogWorker.rollLogSegment(currentOpenSegment);
+        checkAndEvictCache();
       } else if (currentOpenSegment.numOfEntries() > 0 &&
           currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
         // the term changes

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
new file mode 100644
index 0000000..6df8cf7
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestCacheEviction {
+  private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
+  private static final ClientId clientId = ClientId.createId();
+  private static final long callId = 0;
+
+  private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) {
+    Assert.assertEquals(numSegments, cached.length);
+    List<LogSegment> segments = new ArrayList<>(numSegments);
+    for (int i = 0; i < numSegments; i++) {
+      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
+      if (cached[i]) {
+        s = Mockito.spy(s);
+        Mockito.when(s.hasCache()).thenReturn(true);
+      }
+      segments.add(s);
+      start += size;
+    }
+    return segments;
+  }
+
+  @Test
+  public void testBasicEviction() throws Exception {
+    final int maxCached = 5;
+    List<LogSegment> segments = prepareSegments(5,
+        new boolean[]{true, true, true, true, true}, 0, 10);
+
+    // case 1, make sure we do not evict cache for segments behind local flushed index
+    List<LogSegment> evicted = policy.evict(null, 5, 15, segments, maxCached);
+    Assert.assertEquals(0, evicted.size());
+
+    // case 2, suppose the local flushed index is in the 3rd segment, then we
+    // can evict the first two segment
+    evicted = policy.evict(null, 25, 30, segments, maxCached);
+    Assert.assertEquals(2, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+    Assert.assertSame(evicted.get(1), segments.get(1));
+
+    // case 3, similar with case 2, but the local applied index is less than
+    // the local flushed index.
+    evicted = policy.evict(null, 25, 15, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    // case 4, the local applied index is very small, then evict cache behind it
+    // first and let the state machine load the segments later
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(2));
+
+    Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(1));
+
+    Mockito.when(segments.get(1).hasCache()).thenReturn(false);
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(0, evicted.size());
+  }
+
+  @Test
+  public void testEvictionWithFollowerIndices() throws Exception {
+    final int maxCached = 6;
+    List<LogSegment> segments = prepareSegments(6,
+        new boolean[]{true, true, true, true, true, true}, 0, 10);
+
+    // case 1, no matter where the followers are, we do not evict segments behind local
+    // flushed index
+    List<LogSegment> evicted = policy.evict(new long[]{20, 40, 40}, 5, 15, segments,
+        maxCached);
+    Assert.assertEquals(0, evicted.size());
+
+    // case 2, the follower indices are behind the local flushed index
+    evicted = policy.evict(new long[]{30, 40, 45}, 25, 30, segments, maxCached);
+    Assert.assertEquals(2, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+    Assert.assertSame(evicted.get(1), segments.get(1));
+
+    // case 3, similar with case 3 in basic eviction test
+    evicted = policy.evict(new long[]{30, 40, 45}, 25, 15, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    // case 4, the followers are slower than local flush
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    Mockito.when(segments.get(0).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(2));
+
+    Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(3));
+
+    Mockito.when(segments.get(3).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+    Assert.assertEquals(0, evicted.size());
+  }
+
+  @Test
+  public void testEvictionInSegmentedLog() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
+    RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB"));
+    final RaftPeerId peerId = new RaftPeerId("s0");
+    final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop);
+
+    File storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class);
+    RaftServerConfigKeys.setStorageDir(prop, storageDir.getCanonicalPath());
+    RaftStorage storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
+
+    RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
+    ServerState state = Mockito.mock(ServerState.class);
+    Mockito.when(server.getState()).thenReturn(state);
+    Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{});
+    Mockito.when(state.getLastAppliedIndex()).thenReturn(0L);
+
+    SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop);
+    raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+    List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0);
+    LogEntryProto[] entries = generateEntries(slist);
+    raftLog.append(entries);
+    raftLog.logSync();
+
+    // check the current cached segment number: the last segment is still open
+    Assert.assertEquals(maxCachedNum - 1,
+        raftLog.getRaftLogCache().getCachedSegmentNum());
+
+    Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 40});
+    Mockito.when(state.getLastAppliedIndex()).thenReturn(35L);
+    slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum);
+    entries = generateEntries(slist);
+    raftLog.append(entries);
+    raftLog.logSync();
+
+    // check the cached segment number again. since the slowest follower is on
+    // index 21, the eviction should happen and evict 3 segments
+    Assert.assertEquals(maxCachedNum + 1 - 3,
+        raftLog.getRaftLogCache().getCachedSegmentNum());
+  }
+
+  private LogEntryProto[] generateEntries(List<SegmentRange> slist) {
+    List<LogEntryProto> eList = new ArrayList<>();
+    for (SegmentRange range : slist) {
+      for (long index = range.start; index <= range.end; index++) {
+        SimpleOperation m = new SimpleOperation(new String(new byte[1024]));
+        eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+            range.term, index, clientId, callId));
+      }
+    }
+    return eList.toArray(new LogEntryProto[eList.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index 38c879b..86333f0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
@@ -33,12 +34,13 @@ import org.junit.Test;
 public class TestRaftLogCache {
   private static final ClientId clientId = ClientId.createId();
   private static final long callId = 0;
+  private static final RaftProperties prop = new RaftProperties();
 
   private RaftLogCache cache;
 
   @Before
   public void setup() {
-    cache = new RaftLogCache(null);
+    cache = new RaftLogCache(null, prop);
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 1d38971..1c49e70 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -54,7 +54,7 @@ public class TestSegmentedRaftLog {
   private static final ClientId clientId = ClientId.createId();
   private static final long callId = 0;
 
-  private static class SegmentRange {
+  static class SegmentRange {
     final long start;
     final long end;
     final long term;
@@ -109,7 +109,7 @@ public class TestSegmentedRaftLog {
     return entryList.toArray(new LogEntryProto[entryList.size()]);
   }
 
-  private List<SegmentRange> prepareRanges(int number, int segmentSize,
+  static List<SegmentRange> prepareRanges(int number, int segmentSize,
       long startIndex) {
     List<SegmentRange> list = new ArrayList<>(number);
     for (int i = 0; i < number; i++) {