You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/01 01:22:26 UTC
[ratis] 10/16: RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4a0496b188cd96f05fbab10a4327714bb8408d17
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 16 16:34:18 2023 -0800
RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
(cherry picked from commit c4f54dfe88d1b574a03688f16a592cf0d59d13a9)
---
.../ratis/server/raftlog/segmented/LogSegment.java | 47 +++++++++++-----------
.../raftlog/segmented/SegmentedRaftLogCache.java | 7 +++-
.../segmented/SegmentedRaftLogInputStream.java | 15 ++++---
.../raftlog/segmented/SegmentedRaftLogReader.java | 27 +++++++------
.../segmented/SegmentedRaftLogTestUtils.java | 10 +++++
.../statemachine/SimpleStateMachine4Testing.java | 3 +-
.../raftlog/segmented/TestCacheEviction.java | 4 +-
.../server/raftlog/segmented/TestLogSegment.java | 20 ++++-----
.../raftlog/segmented/TestRaftLogReadWrite.java | 7 ++--
.../segmented/TestSegmentedRaftLogCache.java | 3 +-
.../java/org/apache/ratis/tools/ParseRatisLog.java | 14 +++++--
11 files changed, 94 insertions(+), 63 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c3c4d6e53..b8e0e72ff 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -31,6 +31,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,35 +103,31 @@ public final class LogSegment implements Comparable<Long> {
}
}
- static LogSegment newOpenSegment(RaftStorage storage, long start, SegmentedRaftLogMetrics raftLogMetrics) {
+ static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
+ SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
- return new LogSegment(storage, true, start, start - 1, raftLogMetrics);
+ return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics);
}
@VisibleForTesting
static LogSegment newCloseSegment(RaftStorage storage,
- long start, long end, SegmentedRaftLogMetrics raftLogMetrics) {
+ long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0 && end >= start);
- return new LogSegment(storage, false, start, end, raftLogMetrics);
- }
-
- static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SegmentedRaftLogMetrics metrics) {
- return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), metrics)
- : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), metrics);
+ return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics);
}
- public static int readSegmentFile(File file, LogSegmentStartEnd startEnd,
- CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
- throws IOException {
- return readSegmentFile(file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(),
- corruptionPolicy, raftLogMetrics, entryConsumer);
+ static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
+ SegmentedRaftLogMetrics metrics) {
+ return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), maxOpSize, metrics)
+ : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), maxOpSize, metrics);
}
- private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+ public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
throws IOException {
int count = 0;
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
+ try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+ file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), maxOpSize, raftLogMetrics)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
if (prev != null) {
Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
@@ -146,8 +143,8 @@ public final class LogSegment implements Comparable<Long> {
switch (corruptionPolicy) {
case EXCEPTION: throw ioe;
case WARN_AND_RETURN:
- LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully",
- file, start, end, isOpen, count, ioe);
+ LOG.warn("Failed to read segment file {} ({}): only {} entries read successfully",
+ file, startEnd, count, ioe);
break;
default:
throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy
@@ -158,13 +155,13 @@ public final class LogSegment implements Comparable<Long> {
return count;
}
- static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd,
+ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, SegmentedRaftLogMetrics raftLogMetrics)
throws IOException {
- final LogSegment segment = newLogSegment(storage, startEnd, raftLogMetrics);
+ final LogSegment segment = newLogSegment(storage, startEnd, maxOpSize, raftLogMetrics);
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
final boolean isOpen = startEnd.isOpen();
- final int entryCount = readSegmentFile(file, startEnd, corruptionPolicy, raftLogMetrics, entry -> {
+ final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> {
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
@@ -235,7 +232,9 @@ public final class LogSegment implements Comparable<Long> {
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
- readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics, entry -> {
+ final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
+ readSegmentFile(file, startEnd, maxOpSize,
+ getLogCorruptionPolicy(), raftLogMetrics, entry -> {
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
@@ -259,6 +258,7 @@ public final class LogSegment implements Comparable<Long> {
/** Segment end index, inclusive. */
private volatile long endIndex;
private RaftStorage storage;
+ private final SizeInBytes maxOpSize;
private final LogEntryLoader cacheLoader;
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
@@ -272,12 +272,13 @@ public final class LogSegment implements Comparable<Long> {
*/
private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
- private LogSegment(RaftStorage storage, boolean isOpen, long start, long end,
+ private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize,
SegmentedRaftLogMetrics raftLogMetrics) {
this.storage = storage;
this.isOpen = isOpen;
this.startIndex = start;
this.endIndex = end;
+ this.maxOpSize = maxOpSize;
this.cacheLoader = new LogEntryLoader(raftLogMetrics);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index b6f932d6a..e42f451d0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -32,6 +32,7 @@ import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -350,6 +351,7 @@ public class SegmentedRaftLogCache {
private volatile LogSegment openSegment;
private final LogSegmentList closedSegments;
private final RaftStorage storage;
+ private final SizeInBytes maxOpSize;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final int maxCachedSegments;
@@ -367,6 +369,7 @@ public class SegmentedRaftLogCache {
this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
+ this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
}
int getMaxCachedSegments() {
@@ -376,7 +379,7 @@ public class SegmentedRaftLogCache {
void loadSegment(LogSegmentPath pi, boolean keepEntryInCache,
Consumer<LogEntryProto> logConsumer) throws IOException {
final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(),
- keepEntryInCache, logConsumer, raftLogMetrics);
+ maxOpSize, keepEntryInCache, logConsumer, raftLogMetrics);
if (logSegment != null) {
addSegment(logSegment);
}
@@ -434,7 +437,7 @@ public class SegmentedRaftLogCache {
}
void addOpenSegment(long startIndex) {
- setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics));
+ setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, maxOpSize, raftLogMetrics));
}
private void setOpenSegment(LogSegment openSegment) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index e445b1abb..481f837f5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,14 +67,12 @@ public class SegmentedRaftLogInputStream implements Closeable {
private final boolean isOpen;
private final OpenCloseState state;
private SegmentedRaftLogReader reader;
+ private final SizeInBytes maxOpSize;
private final SegmentedRaftLogMetrics raftLogMetrics;
- public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) {
- this(log, startIndex, endIndex, isOpen, null);
- }
-
SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
- SegmentedRaftLogMetrics raftLogMetrics) {
+ SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
+ this.maxOpSize = maxOpSize;
if (isOpen) {
Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
} else {
@@ -92,7 +91,7 @@ public class SegmentedRaftLogInputStream implements Closeable {
state.open();
boolean initSuccess = false;
try {
- reader = new SegmentedRaftLogReader(logFile, raftLogMetrics);
+ reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics);
initSuccess = reader.verifyHeader();
} finally {
if (!initSuccess) {
@@ -191,11 +190,11 @@ public class SegmentedRaftLogInputStream implements Closeable {
* @return Result of the validation
* @throws IOException
*/
- static LogValidation scanEditLog(File file, long maxTxIdToScan)
+ static LogValidation scanEditLog(File file, long maxTxIdToScan, SizeInBytes maxOpSize)
throws IOException {
SegmentedRaftLogInputStream in;
try {
- in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null);
+ in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, maxOpSize, null);
// read the header, initialize the inputstream
in.init();
} catch (EOFException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index dc67d31c4..f1179de84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -27,6 +27,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,21 +134,22 @@ class SegmentedRaftLogReader implements Closeable {
}
}
- private static final int MAX_OP_SIZE = 32 * 1024 * 1024;
-
private final File file;
private final LimitedInputStream limiter;
private final DataInputStream in;
private byte[] temp = new byte[4096];
private final Checksum checksum;
private final SegmentedRaftLogMetrics raftLogMetrics;
+ private final SizeInBytes maxOpSize;
- SegmentedRaftLogReader(File file, SegmentedRaftLogMetrics raftLogMetrics) throws FileNotFoundException {
+ SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics)
+ throws FileNotFoundException {
this.file = file;
this.limiter = new LimitedInputStream(
new BufferedInputStream(new FileInputStream(file)));
in = new DataInputStream(limiter);
checksum = new PureJavaCrc32C();
+ this.maxOpSize = maxOpSize;
this.raftLogMetrics = raftLogMetrics;
}
@@ -273,8 +275,9 @@ class SegmentedRaftLogReader implements Closeable {
* @return The log entry, or null if we hit EOF.
*/
private LogEntryProto decodeEntry() throws IOException {
- limiter.setLimit(MAX_OP_SIZE);
- in.mark(MAX_OP_SIZE);
+ final int max = maxOpSize.getSizeInt();
+ limiter.setLimit(max);
+ in.mark(max);
byte nextByte;
try {
@@ -294,17 +297,17 @@ class SegmentedRaftLogReader implements Closeable {
// Here, we verify that the Op size makes sense and that the
// data matches its checksum before attempting to construct an Op.
int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
- if (entryLength > MAX_OP_SIZE) {
+ if (entryLength > max) {
throw new IOException("Entry has size " + entryLength
- + ", but MAX_OP_SIZE = " + MAX_OP_SIZE);
+ + ", but MAX_OP_SIZE = " + maxOpSize);
}
final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
entryLength);
final int totalLength = varintLength + entryLength;
- checkBufferSize(totalLength);
+ checkBufferSize(totalLength, max);
in.reset();
- in.mark(MAX_OP_SIZE);
+ in.mark(max);
IOUtils.readFully(in, temp, 0, totalLength);
// verify checksum
@@ -323,12 +326,12 @@ class SegmentedRaftLogReader implements Closeable {
CodedInputStream.newInstance(temp, varintLength, entryLength));
}
- private void checkBufferSize(int entryLength) {
- Preconditions.assertTrue(entryLength <= MAX_OP_SIZE);
+ private void checkBufferSize(int entryLength, int max) {
+ Preconditions.assertTrue(entryLength <= max);
int length = temp.length;
if (length < entryLength) {
while (length < entryLength) {
- length = Math.min(length * 2, MAX_OP_SIZE);
+ length = Math.min(length * 2, max);
}
temp = new byte[length];
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index 04527e728..5dfa4de10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -19,8 +19,18 @@ package org.apache.ratis.server.raftlog.segmented;
import org.apache.log4j.Level;
import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.io.File;
public interface SegmentedRaftLogTestUtils {
+ SizeInBytes MAX_OP_SIZE = SizeInBytes.valueOf("32MB");
+
+ static SegmentedRaftLogInputStream newSegmentedRaftLogInputStream(File log,
+ long startIndex, long endIndex, boolean isOpen) {
+ return new SegmentedRaftLogInputStream(log, startIndex, endIndex, isOpen, MAX_OP_SIZE, null);
+ }
+
static void setRaftLogWorkerLogLevel(Level level) {
Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, level);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index cf715585e..122b66e58 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
@@ -309,7 +310,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
} else {
LOG.info("Loading snapshot {}", snapshot);
final long endIndex = snapshot.getIndex();
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+ try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 87dd2ef37..d2ff12e75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -49,6 +49,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
+
public class TestCacheEviction extends BaseTest {
private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
@@ -56,7 +58,7 @@ public class TestCacheEviction extends BaseTest {
Assert.assertEquals(numSegments, cached.length);
final LogSegmentList segments = new LogSegmentList(JavaUtils.getClassSimpleName(TestCacheEviction.class));
for (int i = 0; i < numSegments; i++) {
- LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null);
+ LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, MAX_OP_SIZE, null);
if (cached[i]) {
s = Mockito.spy(s);
Mockito.when(s.hasCache()).thenReturn(true);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 6e0af2dab..ee284ae2b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
import com.codahale.metrics.Timer;
@@ -170,7 +171,7 @@ public class TestLogSegment extends BaseTest {
final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
- LogSegmentStartEnd.valueOf(0), loadInitial, null, null);
+ LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, loadInitial, null, null);
final int delta = isLastEntryPartiallyWritten? 1: 0;
checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
storage.close();
@@ -180,7 +181,7 @@ public class TestLogSegment extends BaseTest {
// load a closed segment (1000-1099)
final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
- LogSegmentStartEnd.valueOf(1000, 1099L), loadInitial, null, null);
+ LogSegmentStartEnd.valueOf(1000, 1099L), MAX_OP_SIZE, loadInitial, null, null);
checkLogSegment(closedSegment, 1000, 1099, false,
closedSegment.getTotalFileSize(), 1);
Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -189,7 +190,7 @@ public class TestLogSegment extends BaseTest {
@Test
public void testAppendEntries() throws Exception {
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+ LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
long size = SegmentedRaftLogFormat.getHeaderLength();
final long max = 8 * 1024 * 1024;
checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -215,7 +216,7 @@ public class TestLogSegment extends BaseTest {
final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
- LogSegmentStartEnd.valueOf(0), true, null, raftLogMetrics);
+ LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, true, null, raftLogMetrics);
checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
storage.close();
@@ -228,7 +229,7 @@ public class TestLogSegment extends BaseTest {
@Test
public void testAppendWithGap() throws Exception {
- LogSegment segment = LogSegment.newOpenSegment(null, 1000, null);
+ LogSegment segment = LogSegment.newOpenSegment(null, 1000, MAX_OP_SIZE, null);
SimpleOperation op = new SimpleOperation("m");
final StateMachineLogEntryProto m = op.getLogEntryContent();
try {
@@ -255,7 +256,7 @@ public class TestLogSegment extends BaseTest {
public void testTruncate() throws Exception {
final long term = 1;
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+ LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
for (int i = 0; i < 100; i++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
@@ -298,7 +299,8 @@ public class TestLogSegment extends BaseTest {
new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
}
- try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+ try(SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+ file, 0, INVALID_LOG_INDEX, true)) {
LogEntryProto entry = in.nextEntry();
Assert.assertNull(entry);
}
@@ -318,8 +320,8 @@ public class TestLogSegment extends BaseTest {
}
Assert.assertEquals(file.length(),
size + SegmentedRaftLogFormat.getHeaderLength());
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0,
- INVALID_LOG_INDEX, true)) {
+ try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+ file, 0, INVALID_LOG_INDEX, true)) {
LogEntryProto entry = in.nextEntry();
Assert.assertArrayEquals(content,
entry.getStateMachineLogEntry().getLogData().toByteArray());
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 88b5e2f48..e79f9f7f9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -75,7 +75,8 @@ public class TestRaftLogReadWrite extends BaseTest {
private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
boolean isOpen) throws IOException {
List<LogEntryProto> list = new ArrayList<>();
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, startIndex, endIndex, isOpen)) {
+ try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+ file, startIndex, endIndex, isOpen)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
list.add(entry);
@@ -207,8 +208,8 @@ public class TestRaftLogReadWrite extends BaseTest {
}
List<LogEntryProto> list = new ArrayList<>();
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(openSegment, 0,
- RaftLog.INVALID_LOG_INDEX, true)) {
+ try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+ openSegment, 0, RaftLog.INVALID_LOG_INDEX, true)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
list.add(entry);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 976e9d6e4..7c5229e5d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.raftlog.segmented;
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
import java.io.IOException;
import java.util.Iterator;
@@ -59,7 +60,7 @@ public class TestSegmentedRaftLogCache {
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
- LogSegment s = LogSegment.newOpenSegment(null, start, null);
+ LogSegment s = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
index 250a4790a..564ce0bf0 100644
--- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
+++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
@@ -24,6 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
+import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.IOException;
@@ -33,15 +34,17 @@ public final class ParseRatisLog {
private final File file;
private final Function<StateMachineLogEntryProto, String> smLogToString;
+ private final SizeInBytes maxOpSize;
private long numConfEntries;
private long numMetadataEntries;
private long numStateMachineEntries;
private long numInvalidEntries;
- private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString) {
+ private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, SizeInBytes maxOpSize) {
this.file = f;
this.smLogToString = smLogToString;
+ this.maxOpSize = maxOpSize;
this.numConfEntries = 0;
this.numMetadataEntries = 0;
this.numStateMachineEntries = 0;
@@ -56,7 +59,7 @@ public final class ParseRatisLog {
}
System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length());
- final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(),
+ final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), maxOpSize,
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry);
System.out.println("Num Total Entries: " + entryCount);
System.out.println("Num Conf Entries: " + numConfEntries);
@@ -85,7 +88,12 @@ public final class ParseRatisLog {
public static class Builder {
private File file = null;
private Function<StateMachineLogEntryProto, String> smLogToString = null;
+ private SizeInBytes maxOpSize = SizeInBytes.valueOf("32MB");
+ public Builder setMaxOpSize(SizeInBytes maxOpSize) {
+ this.maxOpSize = maxOpSize;
+ return this;
+ }
public Builder setSegmentFile(File segmentFile) {
this.file = segmentFile;
@@ -98,7 +106,7 @@ public final class ParseRatisLog {
}
public ParseRatisLog build() {
- return new ParseRatisLog(file, smLogToString);
+ return new ParseRatisLog(file, smLogToString, maxOpSize);
}
}
}