You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/04 10:31:56 UTC
[incubator-ratis] branch master updated: RATIS-1198. Fix OOM when
use FileStore to write 1GB file (#317)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 66768c4 RATIS-1198. Fix OOM when use FileStore to write 1GB file (#317)
66768c4 is described below
commit 66768c430a220f30e4069ed80687ab30e0718d2e
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 4 18:31:47 2020 +0800
RATIS-1198. Fix OOM when use FileStore to write 1GB file (#317)
---
.../ratis/netty/server/DataStreamManagement.java | 1 -
.../apache/ratis/server/RaftServerConfigKeys.java | 10 +++
.../ratis/server/raftlog/segmented/LogSegment.java | 76 ++++++++++++++++------
.../server/raftlog/segmented/SegmentedRaftLog.java | 16 +++--
.../raftlog/segmented/SegmentedRaftLogCache.java | 45 +++++++++----
.../server/raftlog/segmented/TestLogSegment.java | 24 +++----
.../raftlog/segmented/TestSegmentedRaftLog.java | 2 +-
.../segmented/TestSegmentedRaftLogCache.java | 6 +-
8 files changed, 123 insertions(+), 57 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 20be5bc..4ea4507 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -375,7 +375,6 @@ public class DataStreamManagement {
} else if (request.getType() == Type.STREAM_CLOSE) {
if (info.isPrimary()) {
// after all server close stream, primary server start transaction
- // TODO(runzhiwang): send start transaction to leader directly
startTransaction(info, request, ctx);
} else {
sendReply(remoteWrites, request, bytesWritten, ctx);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9543380..0b5a84f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -221,6 +221,16 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, SEGMENT_CACHE_NUM_MAX_KEY, maxCachedSegmentNum);
}
+ String SEGMENT_CACHE_SIZE_MAX_KEY = PREFIX + ".segment.cache.size.max";
+ SizeInBytes SEGMENT_CACHE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("200MB");
+ static SizeInBytes segmentCacheSizeMax(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes, SEGMENT_CACHE_SIZE_MAX_KEY,
+ SEGMENT_CACHE_SIZE_MAX_DEFAULT, getDefaultLog());
+ }
+ static void setSegmentCacheSizeMax(RaftProperties properties, SizeInBytes maxCachedSegmentSize) {
+ setSizeInBytes(properties::set, SEGMENT_CACHE_SIZE_MAX_KEY, maxCachedSegmentSize);
+ }
+
String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size";
SizeInBytes PREALLOCATED_SIZE_DEFAULT = SizeInBytes.valueOf("4MB");
static SizeInBytes preallocatedSize(RaftProperties properties) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index fb58a4a..fc0509e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -42,6 +42,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -60,8 +61,26 @@ public class LogSegment implements Comparable<Long> {
static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
- static long getEntrySize(LogEntryProto entry) {
- final int serialized = ServerProtoUtils.removeStateMachineData(entry).getSerializedSize();
+ enum Op {
+ LOAD_SEGMENT_FILE,
+ REMOVE_CACHE,
+ CHECK_SEGMENT_FILE_FULL,
+ WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
+ WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE
+ }
+
+ static long getEntrySize(LogEntryProto entry, Op op) {
+ LogEntryProto e = entry;
+ if (op == Op.CHECK_SEGMENT_FILE_FULL) {
+ e = ServerProtoUtils.removeStateMachineData(entry);
+ } else if (op == Op.LOAD_SEGMENT_FILE || op == Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE) {
+ Preconditions.assertTrue(entry == ServerProtoUtils.removeStateMachineData(entry),
+ () -> "Unexpected LogEntryProto with StateMachine data: op=" + op + ", entry=" + entry);
+ } else {
+ Preconditions.assertTrue(op == Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE || op == Op.REMOVE_CACHE,
+ () -> "Unexpected op " + op + ", entry=" + entry);
+ }
+ final int serialized = e.getSerializedSize();
return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4L;
}
@@ -139,7 +158,7 @@ public class LogSegment implements Comparable<Long> {
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, raftLogMetrics, entry -> {
- segment.append(keepEntryInCache || isOpen, entry);
+ segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
}
@@ -161,9 +180,9 @@ public class LogSegment implements Comparable<Long> {
// The segment does not have any entries, delete the file.
FileUtils.deleteFile(file);
return null;
- } else if (file.length() > segment.getTotalSize()) {
+ } else if (file.length() > segment.getTotalFileSize()) {
// The segment has extra padding, truncate it.
- FileUtils.truncateFile(file, segment.getTotalSize());
+ FileUtils.truncateFile(file, segment.getTotalFileSize());
}
try {
@@ -211,7 +230,7 @@ public class LogSegment implements Comparable<Long> {
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics,
- entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
+ entry -> putEntryCache(ServerProtoUtils.toTermIndex(entry), entry, Op.LOAD_SEGMENT_FILE));
loadingTimes.incrementAndGet();
return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
}
@@ -224,7 +243,8 @@ public class LogSegment implements Comparable<Long> {
}
private volatile boolean isOpen;
- private long totalSize = SegmentedRaftLogFormat.getHeaderLength();
+ private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength();
+ private AtomicLong totalCacheSize = new AtomicLong(0);
/** Segment start index, inclusive. */
private long startIndex;
/** Segment end index, inclusive. */
@@ -273,12 +293,12 @@ public class LogSegment implements Comparable<Long> {
return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
}
- void appendToOpenSegment(LogEntryProto entry) {
+ void appendToOpenSegment(LogEntryProto entry, Op op) {
Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this);
- append(true, entry);
+ append(true, entry, op);
}
- private void append(boolean keepEntryInCache, LogEntryProto entry) {
+ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
@@ -292,15 +312,15 @@ public class LogSegment implements Comparable<Long> {
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
}
- final LogRecord record = new LogRecord(totalSize, entry);
+ final LogRecord record = new LogRecord(totalFileSize, entry);
records.add(record);
if (keepEntryInCache) {
- entryCache.put(record.getTermIndex(), entry);
+ putEntryCache(record.getTermIndex(), entry, op);
}
if (entry.hasConfigurationEntry()) {
configEntries.add(record.getTermIndex());
}
- totalSize += getEntrySize(entry);
+ totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
}
@@ -343,8 +363,12 @@ public class LogSegment implements Comparable<Long> {
return configEntries.contains(ti);
}
- long getTotalSize() {
- return totalSize;
+ long getTotalFileSize() {
+ return totalFileSize;
+ }
+
+ long getTotalCacheSize() {
+ return totalCacheSize.get();
}
/**
@@ -354,9 +378,9 @@ public class LogSegment implements Comparable<Long> {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
- entryCache.remove(removed.getTermIndex());
+ removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
configEntries.remove(removed.getTermIndex());
- totalSize = removed.offset;
+ totalFileSize = removed.offset;
}
isOpen = false;
this.endIndex = fromIndex - 1;
@@ -381,7 +405,7 @@ public class LogSegment implements Comparable<Long> {
synchronized void clear() {
records.clear();
- entryCache.clear();
+ evictCache();
configEntries.clear();
endIndex = startIndex - 1;
}
@@ -390,8 +414,22 @@ public class LogSegment implements Comparable<Long> {
return loadingTimes.get();
}
- synchronized void evictCache() {
+ void evictCache() {
entryCache.clear();
+ totalCacheSize.set(0);
+ }
+
+ void putEntryCache(TermIndex key, LogEntryProto value, Op op) {
+ final LogEntryProto previous = entryCache.put(key, value);
+ Preconditions.assertNull(previous, "entryCache shouldn't contains duplicated entry");
+ totalCacheSize.getAndAdd(getEntrySize(value, op));
+ }
+
+ void removeEntryCache(TermIndex key, Op op) {
+ LogEntryProto value = entryCache.remove(key);
+ if (value != null) {
+ totalCacheSize.getAndAdd(-getEntrySize(value, op));
+ }
}
boolean hasCache() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1aa9616..c12dd66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -380,7 +380,6 @@ public class SegmentedRaftLog extends RaftLog {
} else if (isSegmentFull(currentOpenSegment, entry)) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
- checkAndEvictCache();
} else if (currentOpenSegment.numOfEntries() > 0 &&
currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
// the term changes
@@ -390,9 +389,11 @@ public class SegmentedRaftLog extends RaftLog {
currentTerm, entry.getTerm());
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
- checkAndEvictCache();
}
+ //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
+ checkAndEvictCache();
+
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
@@ -400,9 +401,10 @@ public class SegmentedRaftLog extends RaftLog {
fileLogWorker.writeLogEntry(entry).getFuture();
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
- cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry));
+ cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry),
+ LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
} else {
- cache.appendEntry(entry);
+ cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
return writeFuture;
} catch (Exception e) {
@@ -414,14 +416,14 @@ public class SegmentedRaftLog extends RaftLog {
}
private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
- if (segment.getTotalSize() >= segmentMaxSize) {
+ if (segment.getTotalFileSize() >= segmentMaxSize) {
return true;
} else {
- final long entrySize = LogSegment.getEntrySize(entry);
+ final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.CHECK_SEGMENT_FILE_FULL);
// if entry size is greater than the max segment size, write it directly
// into the current segment
return entrySize <= segmentMaxSize &&
- segment.getTotalSize() + entrySize > segmentMaxSize;
+ segment.getTotalFileSize() + entrySize > segmentMaxSize;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 260718f..d32d994 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -167,10 +167,21 @@ public class SegmentedRaftLogCache {
}
}
- long sizeInBytes() {
+ long getTotalFileSize() {
return sizeInBytes;
}
+ long getTotalCacheSize() {
+ try(AutoCloseableLock readLock = readLock()) {
+ long size = 0;
+ // TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache.
+ for (LogSegment seg : segments) {
+ size += seg.getTotalCacheSize();
+ }
+ return size;
+ }
+ }
+
long countCached() {
try(AutoCloseableLock readLock = readLock()) {
return segments.stream().filter(LogSegment::hasCache).count();
@@ -231,7 +242,7 @@ public class SegmentedRaftLogCache {
boolean add(LogSegment logSegment) {
try(AutoCloseableLock writeLock = writeLock()) {
- sizeInBytes += logSegment.getTotalSize();
+ sizeInBytes += logSegment.getTotalFileSize();
return segments.add(logSegment);
}
}
@@ -258,9 +269,9 @@ public class SegmentedRaftLogCache {
openSegment.truncate(index);
Preconditions.assertTrue(!openSegment.isOpen());
final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
- oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex());
+ oldEnd, true, openSegment.getTotalFileSize(), openSegment.getEndIndex());
segments.add(openSegment);
- sizeInBytes += openSegment.getTotalSize();
+ sizeInBytes += openSegment.getTotalFileSize();
clearOpenSegment.run();
return new TruncationSegments(info, Collections.emptyList());
}
@@ -269,15 +280,15 @@ public class SegmentedRaftLogCache {
final LogSegment ts = segments.get(segmentIndex);
final long oldEnd = ts.getEndIndex();
final List<SegmentFileInfo> list = new ArrayList<>();
- sizeInBytes -= ts.getTotalSize();
+ sizeInBytes -= ts.getTotalFileSize();
ts.truncate(index);
- sizeInBytes += ts.getTotalSize();
+ sizeInBytes += ts.getTotalFileSize();
final int size = segments.size();
for(int i = size - 1;
i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1);
i--) {
LogSegment s = segments.remove(i);
- sizeInBytes -= s.getTotalSize();
+ sizeInBytes -= s.getTotalFileSize();
final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex();
s.clear();
list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, s.getEndIndex()));
@@ -286,7 +297,7 @@ public class SegmentedRaftLogCache {
list.add(deleteOpenSegment(openSegment, clearOpenSegment));
}
SegmentFileInfo t = ts.numOfEntries() == 0? null:
- new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, ts.getTotalSize(), ts.getEndIndex());
+ new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, ts.getTotalFileSize(), ts.getEndIndex());
return new TruncationSegments(t, list);
}
return null;
@@ -313,7 +324,7 @@ public class SegmentedRaftLogCache {
segmentIndex : segmentIndex - 1;
for (int i = startIndex; i >= 0; i--) {
LogSegment segment = segments.remove(i);
- sizeInBytes -= segment.getTotalSize();
+ sizeInBytes -= segment.getTotalFileSize();
list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
}
} else {
@@ -342,6 +353,7 @@ public class SegmentedRaftLogCache {
private final int maxCachedSegments;
private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
+ private final long maxSegmentCacheSize;
SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties,
RaftLogMetrics raftLogMetrics) {
@@ -353,6 +365,7 @@ public class SegmentedRaftLogCache {
this.raftLogMetrics.addClosedSegmentsSizeInBytes(this);
this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
+ this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
}
int getMaxCachedSegments() {
@@ -373,15 +386,19 @@ public class SegmentedRaftLogCache {
}
public long getClosedSegmentsSizeInBytes() {
- return closedSegments.sizeInBytes();
+ return closedSegments.getTotalFileSize();
}
public long getOpenSegmentSizeInBytes() {
- return openSegment.getTotalSize();
+ return openSegment.getTotalFileSize();
+ }
+
+ public long getTotalCacheSize() {
+ return closedSegments.getTotalCacheSize() + openSegment.getTotalCacheSize();
}
boolean shouldEvict() {
- return closedSegments.countCached() > maxCachedSegments;
+ return closedSegments.countCached() > maxCachedSegments || getTotalCacheSize() > maxSegmentCacheSize;
}
void evictCache(long[] followerIndices, long safeEvictIndex, long lastAppliedIndex) {
@@ -521,11 +538,11 @@ public class SegmentedRaftLogCache {
closedSegments.get(closedSegments.size() - 1).getLastTermIndex());
}
- void appendEntry(LogEntryProto entry) {
+ void appendEntry(LogEntryProto entry, LogSegment.Op op) {
// SegmentedRaftLog does the segment creation/rolling work. Here we just
// simply append the entry into the open segment.
Preconditions.assertTrue(openSegment != null);
- openSegment.appendToOpenSegment(entry);
+ openSegment.appendToOpenSegment(entry, op);
}
/**
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index ed365b9..0380cae 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -128,7 +128,7 @@ public class TestLogSegment extends BaseTest {
Assert.assertEquals(start, segment.getStartIndex());
Assert.assertEquals(end, segment.getEndIndex());
Assert.assertEquals(isOpen, segment.isOpen());
- Assert.assertEquals(totalSize, segment.getTotalSize());
+ Assert.assertEquals(totalSize, segment.getTotalFileSize());
long offset = SegmentedRaftLogFormat.getHeaderLength();
for (long i = start; i <= end; i++) {
@@ -142,7 +142,7 @@ public class TestLogSegment extends BaseTest {
if (entry == null) {
entry = segment.loadCache(record);
}
- offset += getEntrySize(entry);
+ offset += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
}
@@ -183,7 +183,7 @@ public class TestLogSegment extends BaseTest {
LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
1000, 1099, false, loadInitial, null, null);
checkLogSegment(closedSegment, 1000, 1099, false,
- closedSegment.getTotalSize(), 1);
+ closedSegment.getTotalFileSize(), 1);
Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
}
@@ -201,11 +201,11 @@ public class TestLogSegment extends BaseTest {
while (size < max) {
SimpleOperation op = new SimpleOperation("m" + i);
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
- size += getEntrySize(entry);
- segment.appendToOpenSegment(entry);
+ size += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
- Assert.assertTrue(segment.getTotalSize() >= max);
+ Assert.assertTrue(segment.getTotalFileSize() >= max);
checkLogSegment(segment, start, i - 1 + start, true, size, term);
}
@@ -234,18 +234,18 @@ public class TestLogSegment extends BaseTest {
final StateMachineLogEntryProto m = op.getLogEntryContent();
try {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001);
- segment.appendToOpenSegment(entry);
+ segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
Assert.fail("should fail since the entry's index needs to be 1000");
} catch (IllegalStateException e) {
// the exception is expected.
}
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000);
- segment.appendToOpenSegment(entry);
+ segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
try {
entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002);
- segment.appendToOpenSegment(entry);
+ segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
Assert.fail("should fail since the entry's index needs to be 1001");
} catch (IllegalStateException e) {
// the exception is expected.
@@ -260,7 +260,7 @@ public class TestLogSegment extends BaseTest {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
- segment.appendToOpenSegment(entry);
+ segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
// truncate an open segment (remove 1080~1099)
@@ -313,7 +313,7 @@ public class TestLogSegment extends BaseTest {
1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
- size = LogSegment.getEntrySize(entry);
+ size = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
out.write(entry);
}
Assert.assertEquals(file.length(),
@@ -340,7 +340,7 @@ public class TestLogSegment extends BaseTest {
Arrays.fill(content, (byte) 1);
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
- final long entrySize = LogSegment.getEntrySize(entry);
+ final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
long totalSize = SegmentedRaftLogFormat.getHeaderLength();
long preallocated = 16 * 1024;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index e2cd6acc..4ed18be 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -74,7 +74,7 @@ public class TestSegmentedRaftLog extends BaseTest {
}
public static long getOpenSegmentSize(RaftLog raftLog) {
- return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalSize();
+ return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalFileSize();
}
private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 67775e6..30340ca 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -61,7 +61,7 @@ public class TestSegmentedRaftLogCache {
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- s.appendToOpenSegment(entry);
+ s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
if (!isOpen) {
s.close();
@@ -152,7 +152,7 @@ public class TestSegmentedRaftLogCache {
final SimpleOperation m = new SimpleOperation("m");
try {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0);
- cache.appendEntry(entry);
+ cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
Assert.fail("the open segment is null");
} catch (IllegalStateException ignored) {
}
@@ -161,7 +161,7 @@ public class TestSegmentedRaftLogCache {
cache.addSegment(openSegment);
for (long index = 101; index < 200; index++) {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index);
- cache.appendEntry(entry);
+ cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
Assert.assertNotNull(cache.getOpenSegment());