You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/05/10 17:57:28 UTC
incubator-ratis git commit: RATIS-82. Add cache eviction policy.
Contributed by Jing Zhao.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 6eb4f8278 -> 2fbbe0aa9
RATIS-82. Add cache eviction policy. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2fbbe0aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2fbbe0aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2fbbe0aa
Branch: refs/heads/master
Commit: 2fbbe0aa9df31e5e982b235071e0b438f0241453
Parents: 6eb4f82
Author: Jing Zhao <ji...@apache.org>
Authored: Wed May 10 10:57:20 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed May 10 10:57:20 2017 -0700
----------------------------------------------------------------------
.../ratis/hadooprpc/TestRaftWithHadoopRpc.java | 2 +-
.../ratis/server/RaftServerConfigKeys.java | 11 +-
.../apache/ratis/server/impl/LeaderState.java | 4 +
.../ratis/server/impl/RaftServerImpl.java | 8 +
.../ratis/server/impl/ServerProtoUtils.java | 4 +-
.../server/storage/CacheInvalidationPolicy.java | 112 ++++++++++
.../apache/ratis/server/storage/LogSegment.java | 39 +++-
.../ratis/server/storage/RaftLogCache.java | 50 ++++-
.../ratis/server/storage/SegmentedRaftLog.java | 34 +++-
.../ratis/server/storage/TestCacheEviction.java | 202 +++++++++++++++++++
.../ratis/server/storage/TestRaftLogCache.java | 4 +-
.../server/storage/TestSegmentedRaftLog.java | 4 +-
12 files changed, 450 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
index 124e7ee..0519e52 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
@@ -31,7 +31,7 @@ import static org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServer
public class TestRaftWithHadoopRpc extends RaftBasicTests {
static {
- LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index be0c6bc..abab5af 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -23,7 +23,6 @@ import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
import static org.apache.ratis.conf.ConfUtils.*;
@@ -76,6 +75,16 @@ public interface RaftServerConfigKeys {
setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax);
}
+ /**
+ * Besides the open segment, the max number of segments caching log entries.
+ */
+ String SEGMENT_CACHE_MAX_NUM_KEY = PREFIX + ".segment.cache.num.max";
+ int SEGMENT_CACHE_MAX_NUM_DEFAULT = 6;
+ static int maxCachedSegmentNum(RaftProperties properties) {
+ return getInt(properties::getInt, SEGMENT_CACHE_MAX_NUM_KEY,
+ SEGMENT_CACHE_MAX_NUM_DEFAULT, requireMin(0));
+ }
+
String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size";
SizeInBytes PREALLOCATED_SIZE_DEFAULT = SizeInBytes.valueOf("4MB");
static SizeInBytes preallocatedSize(RaftProperties properties) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 96bd28b..c4f92db 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -523,6 +523,10 @@ public class LeaderState {
return pendingRequests.getTransactionContext(index);
}
+ long[] getFollowerNextIndices() {
+ return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
+ }
+
private class ConfigurationStagingState {
private final Map<RaftPeerId, RaftPeer> newPeers;
private final PeerConfiguration newConf;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3c704b0..dca8b08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -913,6 +913,14 @@ public class RaftServerImpl implements RaftServer {
return null;
}
+ public synchronized long[] getFollowerNextIndices() {
+ LeaderState s = this.leaderState;
+ if (s == null || !isLeader()) {
+ return null;
+ }
+ return s.getFollowerNextIndices();
+ }
+
public void applyLogToStateMachine(LogEntryProto next) {
if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
// the reply should have already been set. only need to record
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 945be8d..6705e91 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -75,8 +75,8 @@ public class ServerProtoUtils {
}
private static String toString(RaftRpcReplyProto reply) {
- return reply.getRequestorId() + "->" + reply.getReplyId() + ","
- + reply.getSuccess();
+ return reply.getRequestorId().toStringUtf8() + "->"
+ + reply.getReplyId().toString() + "," + reply.getSuccess();
}
public static RaftConfigurationProto toRaftConfigurationProto(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
new file mode 100644
index 0000000..12534cf
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public interface CacheInvalidationPolicy {
+ /**
+ * Determine which log segments should evict their log entry cache
+ * @param followerNextIndices the next indices of all the follower peers. Null
+ * if the local peer is not a leader.
+ * @param localFlushedIndex the index that has been flushed to the local disk.
+ * @param lastAppliedIndex the last index that has been applied to state machine
+ * @param segments The list of log segments. The segments should be sorted in
+ * ascending order according to log index.
+ * @param maxCachedSegments the max number of segments with cached log entries
+ * @return the log segments that should evict cache
+ */
+ List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex,
+ long lastAppliedIndex, List<LogSegment> segments, int maxCachedSegments);
+
+ class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy {
+ @Override
+ public List<LogSegment> evict(long[] followerNextIndices,
+ long localFlushedIndex, long lastAppliedIndex,
+ List<LogSegment> segments, final int maxCachedSegments) {
+ List<LogSegment> result = new ArrayList<>();
+ int safeIndex = segments.size() - 1;
+ for (; safeIndex >= 0; safeIndex--) {
+ LogSegment segment = segments.get(safeIndex);
+ // a segment's cache can be invalidated only if it's close and all its
+ // entries have been flushed to the local disk
+ if (!segment.isOpen() && segment.getEndIndex() <= localFlushedIndex) {
+ break;
+ }
+ }
+ if (followerNextIndices == null || followerNextIndices.length == 0) {
+ // no followers, determine the eviction based on lastAppliedIndex
+ // first scan from the oldest segment to the one that is right before
+ // lastAppliedIndex. All these segment's cache can be invalidated.
+ int j = 0;
+ for (; j <= safeIndex; j++) {
+ LogSegment segment = segments.get(j);
+ if (segment.getEndIndex() > lastAppliedIndex) {
+ break;
+ }
+ if (segment.hasCache()) {
+ result.add(segment);
+ }
+ }
+ // if there is no cache invalidation target found, pick a segment that
+ // later (but not now) the state machine will consume
+ if (result.isEmpty()) {
+ for (int i = safeIndex; i >= j; i--) {
+ LogSegment s = segments.get(i);
+ if (s.getStartIndex() > lastAppliedIndex && s.hasCache()) {
+ result.add(s);
+ break;
+ }
+ }
+ }
+ } else {
+ // this peer is the leader with followers. determine the eviction based
+ // on followers' next indices and the local lastAppliedIndex.
+ Arrays.sort(followerNextIndices);
+ // segments covering index minToRead will still be loaded. Thus we first
+ // try to evict cache for segments before minToRead.
+ final long minToRead = Math.min(followerNextIndices[0], lastAppliedIndex);
+ int j = 0;
+ for (; j <= safeIndex; j++) {
+ LogSegment s = segments.get(j);
+ if (s.getEndIndex() >= minToRead) {
+ break;
+ }
+ if (s.hasCache()) {
+ result.add(s);
+ }
+ }
+ // if there is no eviction target, continue the scanning and evict
+ // the one that is not being read currently.
+ if (result.isEmpty()) {
+ for (; j <= safeIndex; j++) {
+ LogSegment s = segments.get(j);
+ if (Arrays.stream(followerNextIndices).noneMatch(s::containsIndex)
+ && !s.containsIndex(lastAppliedIndex) && s.hasCache()) {
+ result.add(s);
+ break;
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 6c478dd..ff7f353 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -30,8 +30,6 @@ import org.apache.ratis.util.ProtoUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -99,7 +97,8 @@ class LogSegment implements Comparable<Long> {
return new LogSegment(storage, true, start, start - 1);
}
- private static LogSegment newCloseSegment(RaftStorage storage,
+ @VisibleForTesting
+ static LogSegment newCloseSegment(RaftStorage storage,
long start, long end) {
Preconditions.assertTrue(start >= 0 && end >= start);
return new LogSegment(storage, false, start, end);
@@ -126,14 +125,14 @@ class LogSegment implements Comparable<Long> {
static LogSegment loadSegment(RaftStorage storage, File file,
long start, long end, boolean isOpen,
- boolean keptInCache, Consumer<LogEntryProto> logConsumer)
+ boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
throws IOException {
final LogSegment segment = isOpen ?
LogSegment.newOpenSegment(storage, start) :
LogSegment.newCloseSegment(storage, start, end);
readSegmentFile(file, start, end, isOpen, entry -> {
- segment.append(keptInCache | isOpen, entry);
+ segment.append(keepEntryInCache | isOpen, entry);
if (logConsumer != null) {
logConsumer.accept(entry);
}
@@ -162,6 +161,8 @@ class LogSegment implements Comparable<Long> {
@Override
public LogEntryProto load(LogRecord key) throws IOException {
final File file = getSegmentFile();
+ // note the loading should not exceed the endIndex: it is possible that
+ // the on-disk log file should be truncated but has not been done yet.
readSegmentFile(file, startIndex, endIndex, isOpen,
entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
loadingTimes.incrementAndGet();
@@ -175,14 +176,15 @@ class LogSegment implements Comparable<Long> {
storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
}
- private boolean isOpen;
+ private volatile boolean isOpen;
private long totalSize;
private final long startIndex;
- private long endIndex;
+ private volatile long endIndex;
private final RaftStorage storage;
private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
+ private volatile boolean hasEntryCache;
/**
* the list of records is more like the index of a segment
@@ -200,6 +202,7 @@ class LogSegment implements Comparable<Long> {
this.startIndex = start;
this.endIndex = end;
totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+ hasEntryCache = isOpen;
}
long getStartIndex() {
@@ -224,7 +227,7 @@ class LogSegment implements Comparable<Long> {
append(true, entries);
}
- private void append(boolean keptInCache, LogEntryProto... entries) {
+ private void append(boolean keepEntryInCache, LogEntryProto... entries) {
Preconditions.assertTrue(entries != null && entries.length > 0);
final long term = entries[0].getTerm();
if (records.isEmpty()) {
@@ -246,7 +249,7 @@ class LogSegment implements Comparable<Long> {
final LogRecord record = new LogRecord(totalSize, entry);
records.add(record);
- if (keptInCache) {
+ if (keepEntryInCache) {
entryCache.put(record.getTermIndex(), entry);
}
if (ProtoUtils.isConfigurationLogEntry(entry)) {
@@ -274,7 +277,9 @@ class LogSegment implements Comparable<Long> {
return entry;
}
try {
- return cacheLoader.load(record);
+ entry = cacheLoader.load(record);
+ hasEntryCache = true;
+ return entry;
} catch (Exception e) {
throw new RaftLogIOException(e);
}
@@ -340,6 +345,7 @@ class LogSegment implements Comparable<Long> {
void clear() {
records.clear();
entryCache.clear();
+ hasEntryCache = false;
configEntries.clear();
endIndex = startIndex - 1;
}
@@ -347,4 +353,17 @@ class LogSegment implements Comparable<Long> {
public int getLoadingTimes() {
return loadingTimes.get();
}
+
+ void evictCache() {
+ hasEntryCache = false;
+ entryCache.clear();
+ }
+
+ boolean hasCache() {
+ return hasEntryCache;
+ }
+
+ boolean containsIndex(long index) {
+ return startIndex <= index && endIndex >= index;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 7f13dd8..5863f8d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -19,15 +19,21 @@ package org.apache.ratis.server.storage;
import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.Preconditions;
@@ -65,17 +71,53 @@ class RaftLogCache {
toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
this.toTruncate = toTruncate;
}
+
+ int getDeletionSize() {
+ return toDelete == null ? 0 : toDelete.length;
+ }
}
private LogSegment openSegment;
private final List<LogSegment> closedSegments;
private final RaftStorage storage;
- RaftLogCache(RaftStorage storage) {
+ private final int maxCachedSegments;
+ private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
+
+ RaftLogCache(RaftStorage storage, RaftProperties properties) {
this.storage = storage;
+ maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
closedSegments = new ArrayList<>();
}
+ int getMaxCachedSegments() {
+ return maxCachedSegments;
+ }
+
+ void loadSegment(LogPathAndIndex pi, boolean isOpen, boolean keepEntryInCache,
+ Consumer<LogEntryProto> logConsumer) throws IOException {
+ LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
+ pi.startIndex, pi.endIndex, isOpen, keepEntryInCache, logConsumer);
+ addSegment(logSegment);
+ }
+
+ long getCachedSegmentNum() {
+ return closedSegments.stream().filter(LogSegment::hasCache).count();
+ }
+
+ boolean shouldEvict() {
+ return getCachedSegmentNum() > maxCachedSegments;
+ }
+
+ void evictCache(long[] followerIndices, long flushedIndex,
+ long lastAppliedIndex) {
+ List<LogSegment> toEvict = evictionPolicy.evict(followerIndices,
+ flushedIndex, lastAppliedIndex, closedSegments, maxCachedSegments);
+ for (LogSegment s : toEvict) {
+ s.evictCache();
+ }
+ }
+
private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) {
return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex();
}
@@ -345,7 +387,11 @@ class RaftLogCache {
}
void clear() {
- openSegment = null;
+ if (openSegment != null) {
+ openSegment.clear();
+ openSegment = null;
+ }
+ closedSegments.forEach(LogSegment::clear);
closedSegments.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 8eaa0ab..0c54af7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.LogSegment.LogRecord;
import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.CodeInjectionForTesting;
@@ -67,6 +68,7 @@ import java.util.function.Consumer;
public class SegmentedRaftLog extends RaftLog {
static final String HEADER_STR = "RAFTLOG1";
static final byte[] HEADER_BYTES = HEADER_STR.getBytes(StandardCharsets.UTF_8);
+ static final LogSegment[] EMPTY_SEGMENT_ARRAY = new LogSegment[0];
/**
* I/O task definitions.
@@ -96,6 +98,7 @@ public class SegmentedRaftLog extends RaftLog {
}
private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
+ private final RaftServerImpl server;
private final RaftStorage storage;
private final RaftLogCache cache;
private final RaftLogWorker fileLogWorker;
@@ -105,9 +108,10 @@ public class SegmentedRaftLog extends RaftLog {
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties)
throws IOException {
super(selfId);
+ this.server = server;
this.storage = storage;
- this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- cache = new RaftLogCache(storage);
+ segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+ cache = new RaftLogCache(storage, properties);
fileLogWorker = new RaftLogWorker(server, storage, properties);
lastCommitted.set(lastIndexInSnapshot);
}
@@ -136,11 +140,18 @@ public class SegmentedRaftLog extends RaftLog {
Consumer<LogEntryProto> logConsumer) throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
+ int i = 0;
for (LogPathAndIndex pi : paths) {
boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
- LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
- pi.startIndex, pi.endIndex, isOpen, true, logConsumer);
- cache.addSegment(logSegment);
+ // During the initial loading, we can only confirm the committed
+ // index based on the snapshot. This means if a log segment is not kept
+ // in cache after the initial loading, later we have to load its content
+ // again for updating the state machine.
+ // TODO we should let raft peer persist its committed index periodically
+ // so that during the initial loading we can apply part of the log
+ // entries to the state machine
+ boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
+ cache.loadSegment(pi, isOpen, keepEntryInCache, logConsumer);
}
// if the largest index is smaller than the last index in snapshot, we do
@@ -177,9 +188,21 @@ public class SegmentedRaftLog extends RaftLog {
// the entry is not in the segment's cache. Load the cache without holding
// RaftLog's lock.
+ checkAndEvictCache();
return segment.loadCache(recordAndEntry.getRecord());
}
+ private void checkAndEvictCache() {
+ if (server != null && cache.shouldEvict()) {
+ // TODO if the cache is hitting the maximum size and we cannot evict any
+ // segment's cache, should block the new entry appending or new segment
+ // allocation.
+ cache.evictCache(server.getFollowerNextIndices(),
+ fileLogWorker.getFlushedIndex(),
+ server.getState().getLastAppliedIndex());
+ }
+ }
+
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
@@ -232,6 +255,7 @@ public class SegmentedRaftLog extends RaftLog {
} else if (isSegmentFull(currentOpenSegment, entry)) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
+ checkAndEvictCache();
} else if (currentOpenSegment.numOfEntries() > 0 &&
currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
// the term changes
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
new file mode 100644
index 0000000..6df8cf7
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestCacheEviction {
+ private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
+ private static final ClientId clientId = ClientId.createId();
+ private static final long callId = 0;
+
+ private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) {
+ Assert.assertEquals(numSegments, cached.length);
+ List<LogSegment> segments = new ArrayList<>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
+ if (cached[i]) {
+ s = Mockito.spy(s);
+ Mockito.when(s.hasCache()).thenReturn(true);
+ }
+ segments.add(s);
+ start += size;
+ }
+ return segments;
+ }
+
+ @Test
+ public void testBasicEviction() throws Exception {
+ final int maxCached = 5;
+ List<LogSegment> segments = prepareSegments(5,
+ new boolean[]{true, true, true, true, true}, 0, 10);
+
+ // case 1, make sure we do not evict cache for segments behind local flushed index
+ List<LogSegment> evicted = policy.evict(null, 5, 15, segments, maxCached);
+ Assert.assertEquals(0, evicted.size());
+
+ // case 2, suppose the local flushed index is in the 3rd segment, then we
+ // can evict the first two segment
+ evicted = policy.evict(null, 25, 30, segments, maxCached);
+ Assert.assertEquals(2, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(0));
+ Assert.assertSame(evicted.get(1), segments.get(1));
+
+ // case 3, similar with case 2, but the local applied index is less than
+ // the local flushed index.
+ evicted = policy.evict(null, 25, 15, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(0));
+
+ // case 4, the local applied index is very small, then evict cache behind it
+ // first and let the state machine load the segments later
+ evicted = policy.evict(null, 35, 5, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(2));
+
+ Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+ evicted = policy.evict(null, 35, 5, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(1));
+
+ Mockito.when(segments.get(1).hasCache()).thenReturn(false);
+ evicted = policy.evict(null, 35, 5, segments, maxCached);
+ Assert.assertEquals(0, evicted.size());
+ }
+
+ @Test
+ public void testEvictionWithFollowerIndices() throws Exception {
+ final int maxCached = 6;
+ List<LogSegment> segments = prepareSegments(6,
+ new boolean[]{true, true, true, true, true, true}, 0, 10);
+
+ // case 1, no matter where the followers are, we do not evict segments behind local
+ // flushed index
+ List<LogSegment> evicted = policy.evict(new long[]{20, 40, 40}, 5, 15, segments,
+ maxCached);
+ Assert.assertEquals(0, evicted.size());
+
+ // case 2, the follower indices are behind the local flushed index
+ evicted = policy.evict(new long[]{30, 40, 45}, 25, 30, segments, maxCached);
+ Assert.assertEquals(2, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(0));
+ Assert.assertSame(evicted.get(1), segments.get(1));
+
+ // case 3, similar with case 3 in basic eviction test
+ evicted = policy.evict(new long[]{30, 40, 45}, 25, 15, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(0));
+
+ // case 4, the followers are slower than local flush
+ evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(0));
+
+ Mockito.when(segments.get(0).hasCache()).thenReturn(false);
+ evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(2));
+
+ Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+ evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+ Assert.assertEquals(1, evicted.size());
+ Assert.assertSame(evicted.get(0), segments.get(3));
+
+ Mockito.when(segments.get(3).hasCache()).thenReturn(false);
+ evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached);
+ Assert.assertEquals(0, evicted.size());
+ }
+
+ @Test
+ public void testEvictionInSegmentedLog() throws Exception {
+ final RaftProperties prop = new RaftProperties();
+ prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ SimpleStateMachine4Testing.class, StateMachine.class);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
+ RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB"));
+ final RaftPeerId peerId = new RaftPeerId("s0");
+ final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop);
+
+ File storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class);
+ RaftServerConfigKeys.setStorageDir(prop, storageDir.getCanonicalPath());
+ RaftStorage storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
+
+ RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
+ ServerState state = Mockito.mock(ServerState.class);
+ Mockito.when(server.getState()).thenReturn(state);
+ Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{});
+ Mockito.when(state.getLastAppliedIndex()).thenReturn(0L);
+
+ SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop);
+ raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+ List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0);
+ LogEntryProto[] entries = generateEntries(slist);
+ raftLog.append(entries);
+ raftLog.logSync();
+
+ // check the current cached segment number: the last segment is still open
+ Assert.assertEquals(maxCachedNum - 1,
+ raftLog.getRaftLogCache().getCachedSegmentNum());
+
+ Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 40});
+ Mockito.when(state.getLastAppliedIndex()).thenReturn(35L);
+ slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum);
+ entries = generateEntries(slist);
+ raftLog.append(entries);
+ raftLog.logSync();
+
+ // check the cached segment number again. since the slowest follower is on
+ // index 21, the eviction should happen and evict 3 segments
+ Assert.assertEquals(maxCachedNum + 1 - 3,
+ raftLog.getRaftLogCache().getCachedSegmentNum());
+ }
+
+ private LogEntryProto[] generateEntries(List<SegmentRange> slist) {
+ List<LogEntryProto> eList = new ArrayList<>();
+ for (SegmentRange range : slist) {
+ for (long index = range.start; index <= range.end; index++) {
+ SimpleOperation m = new SimpleOperation(new String(new byte[1024]));
+ eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
+ range.term, index, clientId, callId));
+ }
+ }
+ return eList.toArray(new LogEntryProto[eList.size()]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index 38c879b..86333f0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
@@ -33,12 +34,13 @@ import org.junit.Test;
public class TestRaftLogCache {
private static final ClientId clientId = ClientId.createId();
private static final long callId = 0;
+ private static final RaftProperties prop = new RaftProperties();
private RaftLogCache cache;
@Before
public void setup() {
- cache = new RaftLogCache(null);
+ cache = new RaftLogCache(null, prop);
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fbbe0aa/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 1d38971..1c49e70 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -54,7 +54,7 @@ public class TestSegmentedRaftLog {
private static final ClientId clientId = ClientId.createId();
private static final long callId = 0;
- private static class SegmentRange {
+ static class SegmentRange {
final long start;
final long end;
final long term;
@@ -109,7 +109,7 @@ public class TestSegmentedRaftLog {
return entryList.toArray(new LogEntryProto[entryList.size()]);
}
- private List<SegmentRange> prepareRanges(int number, int segmentSize,
+ static List<SegmentRange> prepareRanges(int number, int segmentSize,
long startIndex) {
List<SegmentRange> list = new ArrayList<>(number);
for (int i = 0; i < number; i++) {