You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/04/26 17:56:33 UTC
incubator-ratis git commit: RATIS-76. Add loading policy for
RaftLogCache. Contributed by Jing Zhao.
Repository: incubator-ratis
Updated Branches:
refs/heads/master b2c691382 -> 13fdb9ee6
RATIS-76. Add loading policy for RaftLogCache. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/13fdb9ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/13fdb9ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/13fdb9ee
Branch: refs/heads/master
Commit: 13fdb9ee6cb89a2bfa3533ab7955b33951de491b
Parents: b2c6913
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Apr 26 10:55:00 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Apr 26 10:55:16 2017 -0700
----------------------------------------------------------------------
.../ratis/grpc/server/GRpcLogAppender.java | 10 +-
.../org/apache/ratis/grpc/TestRaftStream.java | 2 +-
.../apache/ratis/server/impl/LogAppender.java | 16 +-
.../apache/ratis/server/storage/LogSegment.java | 168 ++++++++++++++-----
.../ratis/server/storage/MemoryRaftLog.java | 1 -
.../apache/ratis/server/storage/RaftLog.java | 2 +-
.../ratis/server/storage/RaftLogCache.java | 42 +++--
.../server/storage/RaftLogIOException.java | 29 ++++
.../ratis/server/storage/SegmentedRaftLog.java | 41 +++--
.../java/org/apache/ratis/RaftTestUtil.java | 20 ++-
.../ratis/server/storage/TestRaftLogCache.java | 27 +--
.../server/storage/TestRaftLogSegment.java | 39 +++--
.../server/storage/TestSegmentedRaftLog.java | 21 ++-
13 files changed, 300 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 656adc2..92dd257 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.server;
+import org.apache.ratis.server.storage.RaftLogIOException;
import org.apache.ratis.shaded.io.grpc.Status;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
@@ -79,7 +80,12 @@ public class GRpcLogAppender extends LogAppender {
installSnapshot(snapshot, snapshotResponseHandler);
} else {
// keep appending log entries or sending heartbeats
- appendLog();
+ try {
+ appendLog();
+ } catch (RaftLogIOException e) {
+ LOG.error(this + " hit IOException while loading raft log", e);
+ stopSender();
+ }
}
}
@@ -107,7 +113,7 @@ public class GRpcLogAppender extends LogAppender {
shouldWaitForFirstResponse();
}
- private void appendLog() {
+ private void appendLog() throws RaftLogIOException {
if (appendLogRequestObserver == null) {
appendLogRequestObserver = client.appendEntries(appendResponseHandler);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 83ca5ec..1efe4d3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -104,7 +104,7 @@ public class TestRaftStream {
}
private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
- Supplier<byte[]> s) {
+ Supplier<byte[]> s) throws IOException {
long committedIndex = raftLog.getLastCommittedIndex();
Assert.assertEquals(expectedCommittedIndex, committedIndex);
// check the log content
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index ff12f4e..a48d236 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -87,6 +88,8 @@ public class LogAppender extends Daemon {
checkAndSendAppendEntries();
} catch (InterruptedException | InterruptedIOException e) {
LOG.info(this + " was interrupted: " + e);
+ } catch (RaftLogIOException e) {
+ LOG.error(this + " hit IOException while loading raft log", e);
}
}
@@ -150,7 +153,7 @@ public class LogAppender extends Daemon {
return previous;
}
- protected AppendEntriesRequestProto createRequest() {
+ protected AppendEntriesRequestProto createRequest() throws RaftLogIOException {
final TermIndex previous = getPrevious();
final long leaderNext = raftLog.getNextIndex();
long next = follower.getNextIndex() + buffer.getPendingEntryNum();
@@ -178,7 +181,7 @@ public class LogAppender extends Daemon {
/** Send an appendEntries RPC; retry indefinitely. */
private AppendEntriesReplyProto sendAppendEntriesWithRetries()
- throws InterruptedException, InterruptedIOException {
+ throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
AppendEntriesRequestProto request = null;
while (isAppenderRunning()) { // keep retrying for IOException
@@ -202,9 +205,10 @@ public class LogAppender extends Daemon {
follower.updateLastRpcResponseTime();
return r;
- } catch (InterruptedIOException iioe) {
- throw iioe;
+ } catch (InterruptedIOException | RaftLogIOException e) {
+ throw e;
} catch (IOException ioe) {
+ // TODO should have more detailed retry policy here.
LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe);
}
if (isAppenderRunning()) {
@@ -279,7 +283,7 @@ public class LogAppender extends Daemon {
InstallSnapshotRequestProto request =
server.createInstallSnapshotRequest(follower.getPeer().getId(),
requestId, requestIndex++, snapshot,
- Arrays.asList(chunk), done);
+ Collections.singletonList(chunk), done);
currentOffset += targetLength;
chunkIndex++;
@@ -372,7 +376,7 @@ public class LogAppender extends Daemon {
/** Check and send appendEntries RPC */
private void checkAndSendAppendEntries()
- throws InterruptedException, InterruptedIOException {
+ throws InterruptedException, InterruptedIOException, RaftLogIOException {
while (isAppenderRunning()) {
if (shouldSendRequest()) {
SnapshotInfo snapshot = shouldInstallSnapshot();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 3856585..6c478dd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -17,8 +17,10 @@
*/
package org.apache.ratis.server.storage;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.shaded.com.google.common.cache.CacheLoader;
import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.FileUtils;
@@ -28,11 +30,15 @@ import org.apache.ratis.util.ProtoUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
@@ -47,7 +53,6 @@ class LogSegment implements Comparable<Long> {
return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4;
}
- @VisibleForTesting
static class LogRecord {
/** starting offset in the file */
private final long offset;
@@ -67,44 +72,42 @@ class LogSegment implements Comparable<Long> {
}
}
- private boolean isOpen;
- private long totalSize;
- private final long startIndex;
- private long endIndex;
- /**
- * the list of records is more like the index of a segment
- */
- private final List<LogRecord> records = new ArrayList<>();
- /**
- * the entryCache caches the content of log entries.
- * TODO: currently we cache all the log entries. will fix it soon.
- */
- private final Map<TermIndex, LogEntryProto> entryCache = new HashMap<>();
- private final Set<TermIndex> configEntries = new HashSet<>();
+ static class LogRecordWithEntry {
+ private final LogRecord record;
+ private final LogEntryProto entry;
- private LogSegment(boolean isOpen, long start, long end) {
- this.isOpen = isOpen;
- this.startIndex = start;
- this.endIndex = end;
- totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+ LogRecordWithEntry(LogRecord record, LogEntryProto entry) {
+ this.record = record;
+ this.entry = entry;
+ }
+
+ LogRecord getRecord() {
+ return record;
+ }
+
+ LogEntryProto getEntry() {
+ return entry;
+ }
+
+ boolean hasEntry() {
+ return entry != null;
+ }
}
- static LogSegment newOpenSegment(long start) {
+ static LogSegment newOpenSegment(RaftStorage storage, long start) {
Preconditions.assertTrue(start >= 0);
- return new LogSegment(true, start, start - 1);
+ return new LogSegment(storage, true, start, start - 1);
}
- private static LogSegment newCloseSegment(long start, long end) {
+ private static LogSegment newCloseSegment(RaftStorage storage,
+ long start, long end) {
Preconditions.assertTrue(start >= 0 && end >= start);
- return new LogSegment(false, start, end);
+ return new LogSegment(storage, false, start, end);
}
- static LogSegment loadSegment(File file, long start, long end, boolean isOpen,
- Consumer<LogEntryProto> logConsumer) throws IOException {
- final LogSegment segment;
+ private static void readSegmentFile(File file, long start, long end,
+ boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException {
try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
- segment = isOpen ? LogSegment.newOpenSegment(start) :
- LogSegment.newCloseSegment(start, end);
LogEntryProto next;
LogEntryProto prev = null;
while ((next = in.nextEntry()) != null) {
@@ -112,14 +115,29 @@ class LogSegment implements Comparable<Long> {
Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
"gap between entry %s and entry %s", prev, next);
}
- segment.append(next);
- if (logConsumer != null) {
- logConsumer.accept(next);
+ if (entryConsumer != null) {
+ entryConsumer.accept(next);
}
prev = next;
}
}
+ }
+
+ static LogSegment loadSegment(RaftStorage storage, File file,
+ long start, long end, boolean isOpen,
+ boolean keptInCache, Consumer<LogEntryProto> logConsumer)
+ throws IOException {
+ final LogSegment segment = isOpen ?
+ LogSegment.newOpenSegment(storage, start) :
+ LogSegment.newCloseSegment(storage, start, end);
+
+ readSegmentFile(file, start, end, isOpen, entry -> {
+ segment.append(keptInCache | isOpen, entry);
+ if (logConsumer != null) {
+ logConsumer.accept(entry);
+ }
+ });
// truncate padding if necessary
if (file.length() > segment.getTotalSize()) {
@@ -133,6 +151,57 @@ class LogSegment implements Comparable<Long> {
return segment;
}
+ /**
+ * The current log entry loader simply loads the whole segment into the memory.
+ * In most of the cases this may be good enough considering the main use case
+ * for load log entries is for leader appending to followers.
+ *
+ * In the future we can make the cache loader configurable if necessary.
+ */
+ class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
+ @Override
+ public LogEntryProto load(LogRecord key) throws IOException {
+ final File file = getSegmentFile();
+ readSegmentFile(file, startIndex, endIndex, isOpen,
+ entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
+ loadingTimes.incrementAndGet();
+ return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
+ }
+ }
+
+ private File getSegmentFile() {
+ return isOpen ?
+ storage.getStorageDir().getOpenLogFile(startIndex) :
+ storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
+ }
+
+ private boolean isOpen;
+ private long totalSize;
+ private final long startIndex;
+ private long endIndex;
+ private final RaftStorage storage;
+ private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader();
+ /** later replace it with a metric */
+ private final AtomicInteger loadingTimes = new AtomicInteger();
+
+ /**
+ * the list of records is more like the index of a segment
+ */
+ private final List<LogRecord> records = new ArrayList<>();
+ /**
+ * the entryCache caches the content of log entries.
+ */
+ private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
+ private final Set<TermIndex> configEntries = new HashSet<>();
+
+ private LogSegment(RaftStorage storage, boolean isOpen, long start, long end) {
+ this.storage = storage;
+ this.isOpen = isOpen;
+ this.startIndex = start;
+ this.endIndex = end;
+ totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+ }
+
long getStartIndex() {
return startIndex;
}
@@ -152,10 +221,10 @@ class LogSegment implements Comparable<Long> {
void appendToOpenSegment(LogEntryProto... entries) {
Preconditions.assertTrue(isOpen(),
"The log segment %s is not open for append", this.toString());
- append(entries);
+ append(true, entries);
}
- private void append(LogEntryProto... entries) {
+ private void append(boolean keptInCache, LogEntryProto... entries) {
Preconditions.assertTrue(entries != null && entries.length > 0);
final long term = entries[0].getTerm();
if (records.isEmpty()) {
@@ -177,7 +246,9 @@ class LogSegment implements Comparable<Long> {
final LogRecord record = new LogRecord(totalSize, entry);
records.add(record);
- entryCache.put(record.getTermIndex(), entry);
+ if (keptInCache) {
+ entryCache.put(record.getTermIndex(), entry);
+ }
if (ProtoUtils.isConfigurationLogEntry(entry)) {
configEntries.add(record.getTermIndex());
}
@@ -186,14 +257,27 @@ class LogSegment implements Comparable<Long> {
}
}
- LogEntryProto getLogEntry(long index) {
+ LogRecordWithEntry getEntryWithoutLoading(long index) {
LogRecord record = getLogRecord(index);
- return record == null ? null : entryCache.get(record.getTermIndex());
+ if (record == null) {
+ return null;
+ }
+ return new LogRecordWithEntry(record, entryCache.get(record.getTermIndex()));
}
- TermIndex getTermIndex(long index) {
- LogRecord record = getLogRecord(index);
- return record == null ? null : record.getTermIndex();
+ /**
+ * Acquire LogSegment's monitor so that there is no concurrent loading.
+ */
+ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException {
+ LogEntryProto entry = entryCache.get(record.getTermIndex());
+ if (entry != null) {
+ return entry;
+ }
+ try {
+ return cacheLoader.load(record);
+ } catch (Exception e) {
+ throw new RaftLogIOException(e);
+ }
}
LogRecord getLogRecord(long index) {
@@ -259,4 +343,8 @@ class LogSegment implements Comparable<Long> {
configEntries.clear();
endIndex = startIndex - 1;
}
+
+ public int getLoadingTimes() {
+ return loadingTimes.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index df71d08..ecbae2e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -154,7 +154,6 @@ public class MemoryRaftLog extends RaftLog {
if (toTruncate) {
truncate(truncateIndex);
}
- // Collections.addAll(this.entries, entries);
for (int i = index; i < entries.length; i++) {
this.entries.add(entries[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 5002f83..4d84a57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -179,7 +179,7 @@ public abstract class RaftLog implements Closeable {
* @return The log entry associated with the given index.
* Null if there is no log entry with the index.
*/
- public abstract LogEntryProto get(long index);
+ public abstract LogEntryProto get(long index) throws RaftLogIOException;
/**
* Get the TermIndex information of the given index.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 0142bee..7f13dd8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.LogSegment.LogRecord;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.Preconditions;
@@ -68,8 +69,10 @@ class RaftLogCache {
private LogSegment openSegment;
private final List<LogSegment> closedSegments;
+ private final RaftStorage storage;
- RaftLogCache() {
+ RaftLogCache(RaftStorage storage) {
+ this.storage = storage;
closedSegments = new ArrayList<>();
}
@@ -116,13 +119,13 @@ class RaftLogCache {
openSegment.close();
closedSegments.add(openSegment);
if (createNewOpen) {
- openSegment = LogSegment.newOpenSegment(nextIndex);
+ openSegment = LogSegment.newOpenSegment(storage, nextIndex);
} else {
openSegment = null;
}
}
- private LogSegment getSegment(long index) {
+ LogSegment getSegment(long index) {
if (openSegment != null && index >= openSegment.getStartIndex()) {
return openSegment;
} else {
@@ -131,21 +134,16 @@ class RaftLogCache {
}
}
- LogEntryProto getEntry(long index) {
+ LogRecord getLogRecord(long index) {
LogSegment segment = getSegment(index);
- return segment == null ? null : segment.getLogEntry(index);
- }
-
- TermIndex getTermIndex(long index) {
- LogSegment segment = getSegment(index);
- return segment == null ? null : segment.getTermIndex(index);
+ return segment == null ? null : segment.getLogRecord(index);
}
/**
* @param startIndex inclusive
* @param endIndex exclusive
*/
- TermIndex[] getEntries(final long startIndex, final long endIndex) {
+ TermIndex[] getTermIndices(final long startIndex, final long endIndex) {
if (startIndex < 0 || startIndex < getStartIndex()) {
throw new IndexOutOfBoundsException("startIndex = " + startIndex
+ ", log cache starts from index " + getStartIndex());
@@ -162,19 +160,19 @@ class RaftLogCache {
TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
if (segmentIndex < 0) {
- getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length);
+ getFromSegment(openSegment, startIndex, entries, 0, entries.length);
} else {
long index = startIndex;
for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) {
LogSegment s = closedSegments.get(i);
int numberFromSegment = Math.toIntExact(
Math.min(realEnd - index, s.getEndIndex() - index + 1));
- getEntriesFromSegment(s, index, entries,
+ getFromSegment(s, index, entries,
Math.toIntExact(index - startIndex), numberFromSegment);
index += numberFromSegment;
}
if (index < realEnd) {
- getEntriesFromSegment(openSegment, index, entries,
+ getFromSegment(openSegment, index, entries,
Math.toIntExact(index - startIndex),
Math.toIntExact(realEnd - index));
}
@@ -182,13 +180,13 @@ class RaftLogCache {
return entries;
}
- private void getEntriesFromSegment(LogSegment segment, long startIndex,
+ private void getFromSegment(LogSegment segment, long startIndex,
TermIndex[] entries, int offset, int size) {
long endIndex = segment.getEndIndex();
endIndex = Math.min(endIndex, startIndex + size - 1);
int index = offset;
for (long i = startIndex; i <= endIndex; i++) {
- LogSegment.LogRecord r = segment.getLogRecord(i);
+ LogRecord r = segment.getLogRecord(i);
entries[index++] = r == null ? null : r.getTermIndex();
}
}
@@ -286,11 +284,11 @@ class RaftLogCache {
return null;
}
- Iterator<LogEntryProto> iterator(long startIndex) {
+ Iterator<TermIndex> iterator(long startIndex) {
return new EntryIterator(startIndex);
}
- private class EntryIterator implements Iterator<LogEntryProto> {
+ private class EntryIterator implements Iterator<TermIndex> {
private long nextIndex;
private LogSegment currentSegment;
private int segmentIndex;
@@ -321,10 +319,10 @@ class RaftLogCache {
}
@Override
- public LogEntryProto next() {
- LogEntryProto entry;
+ public TermIndex next() {
+ LogRecord record;
if (currentSegment == null ||
- (entry = currentSegment.getLogEntry(nextIndex)) == null) {
+ (record = currentSegment.getLogRecord(nextIndex)) == null) {
throw new NoSuchElementException();
}
if (++nextIndex > currentSegment.getEndIndex()) {
@@ -334,7 +332,7 @@ class RaftLogCache {
openSegment : closedSegments.get(segmentIndex);
}
}
- return entry;
+ return record.getTermIndex();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
new file mode 100644
index 0000000..b9fb5ff
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.protocol.RaftException;
+
+/**
+ * Exception while reading/writing RaftLog
+ */
+public class RaftLogIOException extends RaftException {
+ public RaftLogIOException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index c956f84..8eaa0ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -23,6 +23,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.LogSegment.LogRecord;
+import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.AutoCloseableLock;
@@ -105,7 +107,7 @@ public class SegmentedRaftLog extends RaftLog {
super(selfId);
this.storage = storage;
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- cache = new RaftLogCache();
+ cache = new RaftLogCache(storage);
fileLogWorker = new RaftLogWorker(server, storage, properties);
lastCommitted.set(lastIndexInSnapshot);
}
@@ -136,8 +138,8 @@ public class SegmentedRaftLog extends RaftLog {
List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
for (LogPathAndIndex pi : paths) {
boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
- LogSegment logSegment = LogSegment.loadSegment(pi.path.toFile(),
- pi.startIndex, pi.endIndex, isOpen, logConsumer);
+ LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
+ pi.startIndex, pi.endIndex, isOpen, true, logConsumer);
cache.addSegment(logSegment);
}
@@ -155,18 +157,35 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public LogEntryProto get(long index) {
+ public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getEntry(index);
+ LogSegment segment;
+ LogRecordWithEntry recordAndEntry;
+ try (AutoCloseableLock readLock = readLock()) {
+ segment = cache.getSegment(index);
+ if (segment == null) {
+ return null;
+ }
+ recordAndEntry = segment.getEntryWithoutLoading(index);
+ if (recordAndEntry == null) {
+ return null;
+ }
+ if (recordAndEntry.hasEntry()) {
+ return recordAndEntry.getEntry();
+ }
}
+
+ // the entry is not in the segment's cache. Load the cache without holding
+ // RaftLog's lock.
+ return segment.loadCache(recordAndEntry.getRecord());
}
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
- return cache.getTermIndex(index);
+ LogRecord record = cache.getLogRecord(index);
+ return record != null ? record.getTermIndex() : null;
}
}
@@ -174,7 +193,7 @@ public class SegmentedRaftLog extends RaftLog {
public TermIndex[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
- return cache.getEntries(startIndex, endIndex);
+ return cache.getTermIndices(startIndex, endIndex);
}
}
@@ -208,7 +227,7 @@ public class SegmentedRaftLog extends RaftLog {
try(AutoCloseableLock writeLock = writeLock()) {
final LogSegment currentOpenSegment = cache.getOpenSegment();
if (currentOpenSegment == null) {
- cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
+ cache.addSegment(LogSegment.newOpenSegment(storage, entry.getIndex()));
fileLogWorker.startLogSegment(getNextIndex());
} else if (isSegmentFull(currentOpenSegment, entry)) {
cache.rollOpenSegment(true);
@@ -248,11 +267,11 @@ public class SegmentedRaftLog extends RaftLog {
return;
}
try(AutoCloseableLock writeLock = writeLock()) {
- Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
+ Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex());
int index = 0;
long truncateIndex = -1;
for (; iter.hasNext() && index < entries.length; index++) {
- LogEntryProto storedEntry = iter.next();
+ TermIndex storedEntry = iter.next();
Preconditions.assertTrue(
storedEntry.getIndex() == entries[index].getIndex(),
"The stored entry's index %s is not consistent with" +
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 76258e5..0680df9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -101,10 +101,13 @@ public class RaftTestUtil {
TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
while (idxEntries < termIndices.length
&& idxExpected < expectedMessages.length) {
- if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
- log.get(termIndices[idxEntries].getIndex()).getSmLogEntry()
- .getData().toByteArray())) {
- ++idxExpected;
+ try {
+ if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
+ log.get(termIndices[idxEntries].getIndex()).getSmLogEntry().getData().toByteArray())) {
+ ++idxExpected;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
++idxEntries;
}
@@ -112,7 +115,7 @@ public class RaftTestUtil {
}
public static void assertLogEntries(Collection<RaftServerImpl> servers,
- SimpleMessage... expectedMessages) {
+ SimpleMessage... expectedMessages) {
final int size = servers.size();
final long count = servers.stream()
.filter(RaftServerImpl::isAlive)
@@ -129,7 +132,12 @@ public class RaftTestUtil {
long startIndex, long expertedTerm, SimpleMessage... expectedMessages) {
Assert.assertEquals(expectedMessages.length, entries.length);
for(int i = 0; i < entries.length; i++) {
- final LogEntryProto e = log.get(entries[i].getIndex());
+ final LogEntryProto e;
+ try {
+ e = log.get(entries[i].getIndex());
+ } catch (IOException exception) {
+ throw new RuntimeException(exception);
+ }
Assert.assertEquals(expertedTerm, e.getTerm());
Assert.assertEquals(startIndex + i, e.getIndex());
Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index d9f7c10..38c879b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.storage;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
@@ -37,11 +38,11 @@ public class TestRaftLogCache {
@Before
public void setup() {
- cache = new RaftLogCache();
+ cache = new RaftLogCache(null);
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
- LogSegment s = LogSegment.newOpenSegment(start);
+ LogSegment s = LogSegment.newOpenSegment(null, start);
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
@@ -54,12 +55,12 @@ public class TestRaftLogCache {
return s;
}
- private void checkCache(long start, long end, int segmentSize) {
+ private void checkCache(long start, long end, int segmentSize) throws IOException {
Assert.assertEquals(start, cache.getStartIndex());
Assert.assertEquals(end, cache.getEndIndex());
for (long index = start; index <= end; index++) {
- LogEntryProto entry = cache.getEntry(index);
+ LogEntryProto entry = cache.getSegment(index).getEntryWithoutLoading(index).getEntry();
Assert.assertEquals(index, entry.getIndex());
}
@@ -75,7 +76,7 @@ public class TestRaftLogCache {
}
private void checkCacheEntries(long offset, int size, long end) {
- TermIndex[] entries = cache.getEntries(offset, offset + size);
+ TermIndex[] entries = cache.getTermIndices(offset, offset + size);
long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
Assert.assertEquals(realEnd - offset, entries.length);
for (long i = offset; i < realEnd; i++) {
@@ -220,16 +221,16 @@ public class TestRaftLogCache {
Assert.assertEquals(249, ts.toTruncate.endIndex);
}
- private void testIterator(long startIndex) {
- Iterator<LogEntryProto> iterator = cache.iterator(startIndex);
- LogEntryProto prev = null;
+ private void testIterator(long startIndex) throws IOException {
+ Iterator<TermIndex> iterator = cache.iterator(startIndex);
+ TermIndex prev = null;
while (iterator.hasNext()) {
- LogEntryProto entry = iterator.next();
- Assert.assertEquals(cache.getEntry(entry.getIndex()), entry);
+ TermIndex termIndex = iterator.next();
+ Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
if (prev != null) {
- Assert.assertEquals(prev.getIndex() + 1, entry.getIndex());
+ Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
}
- prev = entry;
+ prev = termIndex;
}
if (startIndex <= cache.getEndIndex()) {
Assert.assertNotNull(prev);
@@ -254,7 +255,7 @@ public class TestRaftLogCache {
}
testIterator(299);
- Iterator<LogEntryProto> iterator = cache.iterator(300);
+ Iterator<TermIndex> iterator = cache.iterator(300);
Assert.assertFalse(iterator.hasNext());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 4e90d75..73709fc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -86,7 +87,7 @@ public class TestRaftLogSegment {
}
private void checkLogSegment(LogSegment segment, long start, long end,
- boolean isOpen, long totalSize, long term) {
+ boolean isOpen, long totalSize, long term) throws Exception {
Assert.assertEquals(start, segment.getStartIndex());
Assert.assertEquals(end, segment.getEndIndex());
Assert.assertEquals(isOpen, segment.isOpen());
@@ -95,35 +96,51 @@ public class TestRaftLogSegment {
long offset = SegmentedRaftLog.HEADER_BYTES.length;
for (long i = start; i <= end; i++) {
LogSegment.LogRecord record = segment.getLogRecord(i);
- LogEntryProto entry = segment.getLogEntry(i);
- Assert.assertEquals(i, entry.getIndex());
- Assert.assertEquals(term, entry.getTerm());
+ LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
+ Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex());
+ Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm());
Assert.assertEquals(offset, record.getOffset());
+ LogEntryProto entry = lre.hasEntry() ?
+ lre.getEntry() : segment.loadCache(lre.getRecord());
offset += getEntrySize(entry);
}
}
@Test
public void testLoadLogSegment() throws Exception {
+ testLoadSegment(true);
+ }
+
+ @Test
+ public void testLoadCache() throws Exception {
+ testLoadSegment(false);
+ }
+
+ private void testLoadSegment(boolean loadInitial) throws Exception {
// load an open segment
File openSegmentFile = prepareLog(true, 0, 100, 0);
- LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0,
- INVALID_LOG_INDEX, true, null);
+ RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
+ LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
+ INVALID_LOG_INDEX, true, loadInitial, null);
checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0);
+ storage.close();
+ // for open segment we currently always keep log entries in the memory
+ Assert.assertEquals(0, openSegment.getLoadingTimes());
// load a closed segment (1000-1099)
File closedSegmentFile = prepareLog(false, 1000, 100, 1);
- LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000,
- 1099, false, null);
+ LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
+ 1000, 1099, false, loadInitial, null);
checkLogSegment(closedSegment, 1000, 1099, false,
closedSegment.getTotalSize(), 1);
+ Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
}
@Test
public void testAppendEntries() throws Exception {
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(start);
+ LogSegment segment = LogSegment.newOpenSegment(null, start);
long size = SegmentedRaftLog.HEADER_BYTES.length;
final long max = 8 * 1024 * 1024;
checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -147,7 +164,7 @@ public class TestRaftLogSegment {
@Test
public void testAppendWithGap() throws Exception {
- LogSegment segment = LogSegment.newOpenSegment(1000);
+ LogSegment segment = LogSegment.newOpenSegment(null, 1000);
SimpleOperation op = new SimpleOperation("m");
final SMLogEntryProto m = op.getLogEntryContent();
try {
@@ -185,7 +202,7 @@ public class TestRaftLogSegment {
public void testTruncate() throws Exception {
final long term = 1;
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(start);
+ LogSegment segment = LogSegment.newOpenSegment(null, start);
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index afd7d29..1d38971 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -120,7 +120,8 @@ public class TestSegmentedRaftLog {
return list;
}
- private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) {
+ private LogEntryProto getLastEntry(SegmentedRaftLog raftLog)
+ throws IOException {
return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
}
@@ -142,7 +143,13 @@ public class TestSegmentedRaftLog {
TermIndex[] termIndices = raftLog.getEntries(0, 500);
LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
- .map(ti -> raftLog.get(ti.getIndex()))
+ .map(ti -> {
+ try {
+ return raftLog.get(ti.getIndex());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
.collect(Collectors.toList())
.toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
Assert.assertArrayEquals(entries, entriesFromLog);
@@ -265,7 +272,7 @@ public class TestSegmentedRaftLog {
}
private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
- int offset, int size) {
+ int offset, int size) throws IOException {
if (size > 0) {
for (int i = offset; i < size + offset; i++) {
LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
@@ -275,7 +282,13 @@ public class TestSegmentedRaftLog {
expected.get(offset).getIndex(),
expected.get(offset + size - 1).getIndex() + 1);
LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
- .map(ti -> raftLog.get(ti.getIndex()))
+ .map(ti -> {
+ try {
+ return raftLog.get(ti.getIndex());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
.collect(Collectors.toList())
.toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
LogEntryProto[] expectedArray = expected.subList(offset, offset + size)