You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/18 00:57:49 UTC
[incubator-ratis] branch master updated: RATIS-1246. Move out
LogPathAndIndex from RaftStorageDirectory. (#361)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e19894d RATIS-1246. Move out LogPathAndIndex from RaftStorageDirectory. (#361)
e19894d is described below
commit e19894d8097ca075ea76bdb3504ad51507686b08
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Dec 18 08:57:40 2020 +0800
RATIS-1246. Move out LogPathAndIndex from RaftStorageDirectory. (#361)
---
.../examples/arithmetic/TestArithmeticLogDump.java | 5 +-
.../ratis/server/raftlog/segmented/LogSegment.java | 44 ++++---
.../server/raftlog/segmented/LogSegmentPath.java | 133 ++++++++++++++++++++
.../raftlog/segmented/LogSegmentStartEnd.java | 136 +++++++++++++++++++++
.../server/raftlog/segmented/SegmentedRaftLog.java | 12 +-
.../raftlog/segmented/SegmentedRaftLogCache.java | 16 ++-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 38 ++----
.../ratis/server/storage/RaftStorageDirectory.java | 94 +-------------
.../ratis/InstallSnapshotNotificationTests.java | 11 +-
.../ratis/statemachine/RaftSnapshotBaseTest.java | 12 +-
.../apache/ratis/server/ServerRestartTests.java | 8 +-
.../server/raftlog/segmented/TestLogSegment.java | 25 ++--
.../raftlog/segmented/TestRaftLogReadWrite.java | 10 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 9 +-
.../java/org/apache/ratis/tools/ParseRatisLog.java | 6 +-
15 files changed, 362 insertions(+), 197 deletions(-)
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
index 4bc0901..b882679 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
@@ -28,8 +28,8 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.tools.ParseRatisLog;
@@ -76,8 +76,7 @@ public class TestArithmeticLogDump extends BaseTest {
@Test
public void testLogDump() throws Exception {
final RaftServer.Division leaderServer = RaftTestUtil.waitForLeader(cluster);
- List<RaftStorageDirectory.LogPathAndIndex> files =
- leaderServer.getRaftStorage().getStorageDir().getLogSegmentFiles();
+ final List<LogSegmentPath> files = LogSegmentPath.getLogSegmentPaths(leaderServer.getRaftStorage());
Assert.assertEquals(1, files.size());
cluster.shutdown();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 01ffe44..99771fe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -116,10 +116,21 @@ public class LogSegment implements Comparable<Long> {
return new LogSegment(storage, false, start, end, raftLogMetrics);
}
- public static int readSegmentFile(File file, long start, long end,
- boolean isOpen, CorruptionPolicy corruptionPolicy,
- RaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) throws
- IOException {
+ static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, RaftLogMetrics metrics) {
+ return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), metrics)
+ : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), metrics);
+ }
+
+ public static int readSegmentFile(File file, LogSegmentStartEnd startEnd,
+ CorruptionPolicy corruptionPolicy, RaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
+ throws IOException {
+ return readSegmentFile(file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(),
+ corruptionPolicy, raftLogMetrics, entryConsumer);
+ }
+
+ private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+ CorruptionPolicy corruptionPolicy, RaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
+ throws IOException {
int count = 0;
try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
@@ -149,16 +160,13 @@ public class LogSegment implements Comparable<Long> {
return count;
}
- @SuppressWarnings("parameternumber")
- static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen,
+ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd,
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, RaftLogMetrics raftLogMetrics)
throws IOException {
- final LogSegment segment = isOpen ?
- LogSegment.newOpenSegment(storage, start, raftLogMetrics) :
- LogSegment.newCloseSegment(storage, start, end, raftLogMetrics);
-
+ final LogSegment segment = newLogSegment(storage, startEnd, raftLogMetrics);
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
- final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, raftLogMetrics, entry -> {
+ final boolean isOpen = startEnd.isOpen();
+ final int entryCount = readSegmentFile(file, startEnd, corruptionPolicy, raftLogMetrics, entry -> {
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
@@ -166,10 +174,8 @@ public class LogSegment implements Comparable<Long> {
});
LOG.info("Successfully read {} entries from segment file {}", entryCount, file);
- if (isOpen) {
- end = segment.getEndIndex();
- }
-
+ final long start = startEnd.getStartIndex();
+ final long end = isOpen? segment.getEndIndex(): startEnd.getEndIndex();
final int expectedEntryCount = Math.toIntExact(end - start + 1);
final boolean corrupted = entryCount != expectedEntryCount;
if (corrupted) {
@@ -227,7 +233,7 @@ public class LogSegment implements Comparable<Long> {
@Override
public LogEntryProto load(LogRecord key) throws IOException {
- final File file = getSegmentFile();
+ final File file = getFile();
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
@@ -243,10 +249,8 @@ public class LogSegment implements Comparable<Long> {
}
}
- private File getSegmentFile() {
- return isOpen ?
- storage.getStorageDir().getOpenLogFile(startIndex) :
- storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
+ File getFile() {
+ return LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen).getFile(storage);
}
private volatile boolean isOpen;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentPath.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentPath.java
new file mode 100644
index 0000000..a3a6aa9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentPath.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog.segmented;
+
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+
+/**
+ * {@link LogSegmentStartEnd} with a {@link Path}.
+ *
+ * This is a value-based class.
+ */
+public final class LogSegmentPath implements Comparable<LogSegmentPath> {
+ static final Logger LOG = LoggerFactory.getLogger(LogSegmentPath.class);
+
+ private final Path path;
+ private final LogSegmentStartEnd startEnd;
+
+ private LogSegmentPath(Path path, long startIndex, Long endIndex) {
+ this.path = path;
+ this.startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public LogSegmentStartEnd getStartEnd() {
+ return startEnd;
+ }
+
+ @Override
+ public int compareTo(LogSegmentPath that) {
+ return Comparator.comparing(LogSegmentPath::getStartEnd).compare(this, that);
+ }
+
+ @Override
+ public String toString() {
+ return path+ "(" + startEnd + ")";
+ }
+
+ /**
+ * Get a list of {@link LogSegmentPath} from the given storage.
+ *
+ * @return a list of log segment paths sorted by the indices.
+ */
+ public static List<LogSegmentPath> getLogSegmentPaths(RaftStorage storage) throws IOException {
+ return getLogSegmentPaths(storage.getStorageDir().getCurrentDir().toPath());
+ }
+
+ private static List<LogSegmentPath> getLogSegmentPaths(Path dir) throws IOException {
+ final List<LogSegmentPath> list = new ArrayList<>();
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
+ for (Path path : stream) {
+ Optional.ofNullable(matchLogSegment(path)).ifPresent(list::add);
+ }
+ }
+ list.sort(Comparator.naturalOrder());
+ return list;
+ }
+
+ /**
+ * Match the given path with the {@link LogSegmentStartEnd#getClosedSegmentPattern()}
+ * or the {@link LogSegmentStartEnd#getOpenSegmentPattern()}.
+ *
+ * Note that if the path is a zero size open segment, this method will try to delete it.
+ *
+ * @return the log segment file matching the given path.
+ */
+ public static LogSegmentPath matchLogSegment(Path path) {
+ return Optional.ofNullable(matchCloseSegment(path)).orElseGet(() -> matchOpenSegment(path));
+ }
+
+ private static LogSegmentPath matchCloseSegment(Path path) {
+ final Matcher matcher = LogSegmentStartEnd.getClosedSegmentPattern().matcher(path.getFileName().toString());
+ if (matcher.matches()) {
+ Preconditions.assertTrue(matcher.groupCount() == 2);
+ return newInstance(path, matcher.group(1), matcher.group(2));
+ }
+ return null;
+ }
+
+ private static LogSegmentPath matchOpenSegment(Path path) {
+ final Matcher matcher = LogSegmentStartEnd.getOpenSegmentPattern().matcher(path.getFileName().toString());
+ if (matcher.matches()) {
+ if (path.toFile().length() > 0L) {
+ return newInstance(path, matcher.group(1), null);
+ }
+
+ LOG.info("Found zero size open segment file " + path);
+ try {
+ Files.delete(path);
+ LOG.info("Deleted zero size open segment file " + path);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete zero size open segment file " + path + ": " + e);
+ }
+ }
+ return null;
+ }
+
+ private static LogSegmentPath newInstance(Path path, String startIndexString, String endIndexString) {
+ final long startIndex = Long.parseLong(startIndexString);
+ final Long endIndex = Optional.ofNullable(endIndexString).map(Long::parseLong).orElse(null);
+ return new LogSegmentPath(path, startIndex, endIndex);
+ }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentStartEnd.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentStartEnd.java
new file mode 100644
index 0000000..6fbaeab
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegmentStartEnd.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog.segmented;
+
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * The start index and an end index of a log segment.
+ *
+ * This is a value-based class.
+ */
+public final class LogSegmentStartEnd implements Comparable<LogSegmentStartEnd> {
+ private static final String LOG_FILE_NAME_PREFIX = "log";
+ private static final String IN_PROGRESS = "inprogress";
+ private static final Pattern CLOSED_SEGMENT_PATTERN;
+ private static final Pattern OPEN_SEGMENT_PATTERN;
+
+ static {
+ final String digits = "(\\d+)";
+ CLOSED_SEGMENT_PATTERN = Pattern.compile(LOG_FILE_NAME_PREFIX + "_" + digits + "-" + digits);
+ OPEN_SEGMENT_PATTERN = Pattern.compile(LOG_FILE_NAME_PREFIX + "_" + IN_PROGRESS + "_" + digits + "(?:\\..*)?");
+ }
+
+ private static String getOpenLogFileName(long startIndex) {
+ return LOG_FILE_NAME_PREFIX + "_" + IN_PROGRESS + "_" + startIndex;
+ }
+
+ static Pattern getOpenSegmentPattern() {
+ return OPEN_SEGMENT_PATTERN;
+ }
+
+ private static String getClosedLogFileName(long startIndex, long endIndex) {
+ return LOG_FILE_NAME_PREFIX + "_" + startIndex + "-" + endIndex;
+ }
+
+ static Pattern getClosedSegmentPattern() {
+ return CLOSED_SEGMENT_PATTERN;
+ }
+
+ static LogSegmentStartEnd valueOf(long startIndex) {
+ return new LogSegmentStartEnd(startIndex, null);
+ }
+
+ static LogSegmentStartEnd valueOf(long startIndex, Long endIndex) {
+ return new LogSegmentStartEnd(startIndex, endIndex);
+ }
+
+ static LogSegmentStartEnd valueOf(long startIndex, long endIndex, boolean isOpen) {
+ return new LogSegmentStartEnd(startIndex, isOpen? null: endIndex);
+ }
+
+ private final long startIndex;
+ private final Long endIndex;
+
+ private LogSegmentStartEnd(long startIndex, Long endIndex) {
+ Preconditions.assertTrue(startIndex >= RaftLog.LEAST_VALID_LOG_INDEX);
+ Preconditions.assertTrue(endIndex == null || endIndex >= startIndex);
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ }
+
+ public long getStartIndex() {
+ return startIndex;
+ }
+
+ public long getEndIndex() {
+ return Optional.ofNullable(endIndex).orElse(RaftLog.INVALID_LOG_INDEX);
+ }
+
+ public boolean isOpen() {
+ return endIndex == null;
+ }
+
+ private String getFileName() {
+ return isOpen()? getOpenLogFileName(startIndex): getClosedLogFileName(startIndex, endIndex);
+ }
+
+ File getFile(File dir) {
+ return new File(dir, getFileName());
+ }
+
+ File getFile(RaftStorage storage) {
+ return getFile(storage.getStorageDir().getCurrentDir());
+ }
+
+ @Override
+ public int compareTo(LogSegmentStartEnd that) {
+ return Comparator.comparingLong(LogSegmentStartEnd::getStartIndex)
+ .thenComparingLong(LogSegmentStartEnd::getEndIndex)
+ .compare(this, that);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final LogSegmentStartEnd that = (LogSegmentStartEnd) obj;
+ return startIndex == that.startIndex && Objects.equals(endIndex, that.endIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(startIndex, endIndex);
+ }
+
+ @Override
+ public String toString() {
+ return startIndex + "-" + Optional.ofNullable(endIndex).map(Object::toString).orElse("");
+ }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 604eae5..56bd0a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -29,7 +29,6 @@ import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
-import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -202,12 +201,7 @@ public class SegmentedRaftLog extends RaftLog {
@Override
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
loadLogSegments(lastIndexInSnapshot, consumer);
- File openSegmentFile = null;
- LogSegment openSegment = cache.getOpenSegment();
- if (openSegment != null) {
- openSegmentFile = storage.getStorageDir()
- .getOpenLogFile(openSegment.getStartIndex());
- }
+ final File openSegmentFile = Optional.ofNullable(cache.getOpenSegment()).map(LogSegment::getFile).orElse(null);
fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
Math.min(cache.getLastIndexInClosedSegments(), lastIndexInSnapshot),
openSegmentFile);
@@ -221,9 +215,9 @@ public class SegmentedRaftLog extends RaftLog {
private void loadLogSegments(long lastIndexInSnapshot,
Consumer<LogEntryProto> logConsumer) throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
- List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
+ final List<LogSegmentPath> paths = LogSegmentPath.getLogSegmentPaths(storage);
int i = 0;
- for (LogPathAndIndex pi : paths) {
+ for (LogSegmentPath pi : paths) {
// During the initial loading, we can only confirm the committed
// index based on the snapshot. This means if a log segment is not kept
// in cache after the initial loading, later we have to load its content
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index ea04f3c..603fd34 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -27,7 +27,6 @@ import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
-import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.JavaUtils;
@@ -35,6 +34,7 @@ import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.function.Consumer;
@@ -88,6 +88,14 @@ public class SegmentedRaftLogCache {
this.newEndIndex = newEndIndex;
}
+ File getFile(RaftStorage storage) {
+ return LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen).getFile(storage);
+ }
+
+ File getNewFile(RaftStorage storage) {
+ return LogSegmentStartEnd.valueOf(startIndex, newEndIndex, false).getFile(storage);
+ }
+
@Override
public String toString() {
return "(" + startIndex + ", " + endIndex
@@ -372,10 +380,10 @@ public class SegmentedRaftLogCache {
return maxCachedSegments;
}
- void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
+ void loadSegment(LogSegmentPath pi, boolean keepEntryInCache,
Consumer<LogEntryProto> logConsumer) throws IOException {
- LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(),
- pi.getStartIndex(), pi.getEndIndex(), pi.isOpen(), keepEntryInCache, logConsumer, raftLogMetrics);
+ final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(),
+ keepEntryInCache, logConsumer, raftLogMetrics);
if (logSegment != null) {
addSegment(logSegment);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 5c238bc..93996c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -416,16 +416,14 @@ class SegmentedRaftLogWorker {
}
Task purge(TruncationSegments ts) {
- return addIOTask(new PurgeLog(ts, storage));
+ return addIOTask(new PurgeLog(ts));
}
private final class PurgeLog extends Task {
private final TruncationSegments segments;
- private final RaftStorage storage;
- private PurgeLog(TruncationSegments segments, RaftStorage storage) {
+ private PurgeLog(TruncationSegments segments) {
this.segments = segments;
- this.storage = storage;
}
@Override
@@ -433,9 +431,7 @@ class SegmentedRaftLogWorker {
if (segments.getToDelete() != null) {
Timer.Context purgeLogContext = raftLogMetrics.getRaftLogPurgeTimer().time();
for (SegmentFileInfo fileInfo : segments.getToDelete()) {
- File delFile = storage.getStorageDir()
- .getClosedLogFile(fileInfo.getStartIndex(), fileInfo.getEndIndex());
- FileUtils.deleteFile(delFile);
+ FileUtils.deleteFile(fileInfo.getFile(storage));
}
purgeLogContext.stop();
}
@@ -530,6 +526,10 @@ class SegmentedRaftLogWorker {
}
}
+ File getFile(long startIndex, Long endIndex) {
+ return LogSegmentStartEnd.valueOf(startIndex, endIndex).getFile(storage);
+ }
+
private class FinalizeLogSegment extends Task {
private final long startIndex;
private final long endIndex;
@@ -544,12 +544,12 @@ class SegmentedRaftLogWorker {
public void execute() throws IOException {
freeSegmentedRaftLogOutputStream();
- File openFile = storage.getStorageDir().getOpenLogFile(startIndex);
+ final File openFile = getFile(startIndex, null);
Preconditions.assertTrue(openFile.exists(),
() -> name + ": File " + openFile + " to be rolled does not exist");
if (endIndex - startIndex + 1 > 0) {
// finalize the current open segment
- File dstFile = storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
+ final File dstFile = getFile(startIndex, endIndex);
Preconditions.assertTrue(!dstFile.exists());
FileUtils.move(openFile, dstFile);
@@ -589,7 +589,7 @@ class SegmentedRaftLogWorker {
@Override
void execute() throws IOException {
- File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
+ final File openFile = getFile(newStartIndex, null);
Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s",
openFile, name);
Preconditions.assertTrue(pendingFlushNum == 0);
@@ -626,19 +626,13 @@ class SegmentedRaftLogWorker {
freeSegmentedRaftLogOutputStream();
if (segments.getToTruncate() != null) {
- File fileToTruncate = segments.getToTruncate().isOpen() ?
- storage.getStorageDir().getOpenLogFile(
- segments.getToTruncate().getStartIndex()) :
- storage.getStorageDir().getClosedLogFile(
- segments.getToTruncate().getStartIndex(),
- segments.getToTruncate().getEndIndex());
+ final File fileToTruncate = segments.getToTruncate().getFile(storage);
Preconditions.assertTrue(fileToTruncate.exists(),
"File %s to be truncated does not exist", fileToTruncate);
FileUtils.truncateFile(fileToTruncate, segments.getToTruncate().getTargetLength());
// rename the file
- File dstFile = storage.getStorageDir().getClosedLogFile(
- segments.getToTruncate().getStartIndex(), segments.getToTruncate().getNewEndIndex());
+ final File dstFile = segments.getToTruncate().getNewFile(storage);
Preconditions.assertTrue(!dstFile.exists(),
"Truncated file %s already exists ", dstFile);
FileUtils.move(fileToTruncate, dstFile);
@@ -651,13 +645,7 @@ class SegmentedRaftLogWorker {
if (segments.getToDelete() != null && segments.getToDelete().length > 0) {
long minStart = segments.getToDelete()[0].getStartIndex();
for (SegmentFileInfo del : segments.getToDelete()) {
- final File delFile;
- if (del.isOpen()) {
- delFile = storage.getStorageDir().getOpenLogFile(del.getStartIndex());
- } else {
- delFile = storage.getStorageDir()
- .getClosedLogFile(del.getStartIndex(), del.getEndIndex());
- }
+ final File delFile = del.getFile(storage);
Preconditions.assertTrue(delFile.exists(),
"File %s to be deleted does not exist", delFile);
FileUtils.deleteFile(delFile);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
index 39213e5..0f8e1df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.server.storage;
-import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.AtomicFileOutputStream;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
@@ -34,8 +33,6 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import static java.nio.file.Files.newDirectoryStream;
@@ -45,54 +42,16 @@ public class RaftStorageDirectory {
static final String STORAGE_DIR_CURRENT = "current";
static final String STORAGE_FILE_LOCK = "in_use.lock";
static final String META_FILE_NAME = "raft-meta";
- static final String LOG_FILE_INPROGRESS = "inprogress";
- static final String LOG_FILE_PREFIX = "log";
static final String STATE_MACHINE = "sm"; // directory containing state machine snapshots
static final String TEMP = "tmp";
- static final Pattern CLOSED_SEGMENT_REGEX = Pattern.compile("log_(\\d+)-(\\d+)");
- static final Pattern OPEN_SEGMENT_REGEX = Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?");
private static final String CONF_EXTENSION = ".conf";
-
enum StorageState {
NON_EXISTENT,
NOT_FORMATTED,
NORMAL
}
- public static class LogPathAndIndex {
- private final Path path;
- private final long startIndex;
- private final long endIndex;
-
- LogPathAndIndex(Path path, long startIndex, long endIndex) {
- this.path = path;
- this.startIndex = startIndex;
- this.endIndex = endIndex;
- }
-
- public long getStartIndex() {
- return startIndex;
- }
-
- public long getEndIndex() {
- return endIndex;
- }
-
- public Path getPath() {
- return path;
- }
-
- public boolean isOpen() {
- return endIndex == RaftLog.INVALID_LOG_INDEX;
- }
-
- @Override
- public String toString() {
- return path + "-" + startIndex + "-" + endIndex;
- }
- }
-
private final File root; // root directory
private FileLock lock; // storage lock
@@ -143,7 +102,7 @@ public class RaftStorageDirectory {
*
* @return the directory path
*/
- File getCurrentDir() {
+ public File getCurrentDir() {
return new File(root, STORAGE_DIR_CURRENT);
}
@@ -160,22 +119,6 @@ public class RaftStorageDirectory {
return new File(getCurrentDir(), META_FILE_NAME + CONF_EXTENSION);
}
- public File getOpenLogFile(long startIndex) {
- return new File(getCurrentDir(), getOpenLogFileName(startIndex));
- }
-
- static String getOpenLogFileName(long startIndex) {
- return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex;
- }
-
- public File getClosedLogFile(long startIndex, long endIndex) {
- return new File(getCurrentDir(), getClosedLogFileName(startIndex, endIndex));
- }
-
- static String getClosedLogFileName(long startIndex, long endIndex) {
- return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex;
- }
-
public File getStateMachineDir() {
return new File(getRoot(), STATE_MACHINE);
}
@@ -192,41 +135,6 @@ public class RaftStorageDirectory {
return p;
}
- /**
- * @return log segment files sorted based on their index.
- */
- public List<LogPathAndIndex> getLogSegmentFiles() throws IOException {
- List<LogPathAndIndex> list = new ArrayList<>();
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(getCurrentDir().toPath())) {
- for (Path path : stream) {
- LogPathAndIndex lpi = processOnePath(path);
- if (lpi != null) {
- list.add(lpi);
- }
- }
- }
- list.sort(Comparator.comparingLong(o -> o.startIndex));
- return list;
- }
-
- public static LogPathAndIndex processOnePath(Path path) throws IOException {
- for (Pattern pattern : Arrays.asList(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX)) {
- Matcher matcher = pattern.matcher(path.getFileName().toString());
- if (matcher.matches()) {
- if (pattern == OPEN_SEGMENT_REGEX && Files.size(path) == 0L) {
- Files.delete(path);
- LOG.info("Delete zero size file " + path);
- return null;
- }
- final long startIndex = Long.parseLong(matcher.group(1));
- final long endIndex = matcher.groupCount() == 2 ?
- Long.parseLong(matcher.group(2)) : RaftLog.INVALID_LOG_INDEX;
- return new LogPathAndIndex(path, startIndex, endIndex);
- }
- }
- return null;
- }
/**
* Check to see if current/ directory is empty.
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index ff982c6..c6bedea 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -29,7 +29,8 @@ import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -115,7 +116,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
private void testAddNewFollowers(CLUSTER cluster) throws Exception {
leaderSnapshotInfoRef.set(null);
- final List<RaftStorageDirectory.LogPathAndIndex> logs;
+ final List<LogSegmentPath> logs;
int i = 0;
try {
RaftTestUtil.waitForLeader(cluster);
@@ -131,8 +132,6 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
// wait for the snapshot to be done
final RaftServer.Division leader = cluster.getLeader();
- final RaftStorageDirectory storageDirectory = leader.getRaftStorage().getStorageDir();
-
final long nextIndex = leader.getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
@@ -141,14 +140,14 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
- logs = storageDirectory.getLogSegmentFiles();
+ logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
} finally {
cluster.shutdown();
}
// delete the log segments from the leader
LOG.info("Delete logs {}", logs);
- for (RaftStorageDirectory.LogPathAndIndex path : logs) {
+ for (LogSegmentPath path : logs) {
FileUtils.deleteFully(path.getPath()); // the log may be already puged
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index b5a5f31..a09c6d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -39,8 +39,8 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.FileUtils;
@@ -184,7 +184,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
*/
@Test
public void testBasicInstallSnapshot() throws Exception {
- final List<LogPathAndIndex> logs;
+ final List<LogSegmentPath> logs;
int i = 0;
try {
RaftTestUtil.waitForLeader(cluster);
@@ -198,8 +198,6 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
// wait for the snapshot to be done
- final RaftStorageDirectory storageDirectory = cluster.getLeader().getRaftStorage().getStorageDir();
-
final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -208,13 +206,13 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
verifyTakeSnapshotMetric(cluster.getLeader());
- logs = storageDirectory.getLogSegmentFiles();
+ logs = LogSegmentPath.getLogSegmentPaths(cluster.getLeader().getRaftStorage());
} finally {
cluster.shutdown();
}
// delete the log segments from the leader
- for (LogPathAndIndex path : logs) {
+ for (LogSegmentPath path : logs) {
FileUtils.delete(path.getPath());
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 8f1b1bf..d31dc7d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -37,7 +37,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogFormat;
import org.apache.ratis.server.RaftServerConfigKeys.Log;
import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog;
-import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.FileUtils;
@@ -169,9 +169,9 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
}
static List<Path> getOpenLogFiles(RaftServer.Division server) throws Exception {
- return server.getRaftStorage().getStorageDir().getLogSegmentFiles().stream()
- .filter(LogPathAndIndex::isOpen)
- .map(LogPathAndIndex::getPath)
+ return LogSegmentPath.getLogSegmentPaths(server.getRaftStorage()).stream()
+ .filter(p -> p.getStartEnd().isOpen())
+ .map(LogSegmentPath::getPath)
.collect(Collectors.toList());
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 81267cb..cfad2ea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -25,7 +25,6 @@ import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
@@ -89,9 +88,7 @@ public class TestLogSegment extends BaseTest {
Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, the last entry cannot be partially written.");
}
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- final File file = isOpen ?
- storage.getStorageDir().getOpenLogFile(startIndex) :
- storage.getStorageDir().getClosedLogFile(startIndex, startIndex + numEntries - 1);
+ final File file = LogSegmentStartEnd.valueOf(startIndex, startIndex + numEntries - 1, isOpen).getFile(storage);
final LogEntryProto[] entries = new LogEntryProto[numEntries];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
@@ -170,8 +167,8 @@ public class TestLogSegment extends BaseTest {
// load an open segment
final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
- INVALID_LOG_INDEX, true, loadInitial, null, null);
+ final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
+ LogSegmentStartEnd.valueOf(0), loadInitial, null, null);
final int delta = isLastEntryPartiallyWritten? 1: 0;
checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
storage.close();
@@ -181,7 +178,7 @@ public class TestLogSegment extends BaseTest {
// load a closed segment (1000-1099)
final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
- 1000, 1099, false, loadInitial, null, null);
+ LogSegmentStartEnd.valueOf(1000, 1099L), loadInitial, null, null);
checkLogSegment(closedSegment, 1000, 1099, false,
closedSegment.getTotalFileSize(), 1);
Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -215,8 +212,8 @@ public class TestLogSegment extends BaseTest {
final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
- INVALID_LOG_INDEX, true, true, null, raftLogMetrics);
+ final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
+ LogSegmentStartEnd.valueOf(0), true, null, raftLogMetrics);
checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
storage.close();
@@ -285,7 +282,7 @@ public class TestLogSegment extends BaseTest {
@Test
public void testPreallocateSegment() throws Exception {
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- final File file = storage.getStorageDir().getOpenLogFile(0);
+ final File file = LogSegmentStartEnd.valueOf(0).getFile(storage);
final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024,
1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024,
2 * 1024 * 1024 + 1, 8 * 1024 * 1024};
@@ -334,7 +331,7 @@ public class TestLogSegment extends BaseTest {
public void testPreallocationAndAppend() throws Exception {
final SizeInBytes max = SizeInBytes.valueOf(2, TraditionalBinaryPrefix.MEGA);
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- final File file = storage.getStorageDir().getOpenLogFile(0);
+ final File file = LogSegmentStartEnd.valueOf(0).getFile(storage);
final byte[] content = new byte[1024];
Arrays.fill(content, (byte) 1);
@@ -364,7 +361,7 @@ public class TestLogSegment extends BaseTest {
@Test
public void testZeroSizeInProgressFile() throws Exception {
final RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- final File file = storage.getStorageDir().getOpenLogFile(0);
+ final File file = LogSegmentStartEnd.valueOf(0).getFile(storage);
storage.close();
// create zero size in-progress file
@@ -374,8 +371,8 @@ public class TestLogSegment extends BaseTest {
Assert.assertTrue(Files.exists(path));
Assert.assertEquals(0, Files.size(path));
- // getLogSegmentFiles should remove it.
- final List<RaftStorageDirectory.LogPathAndIndex> logs = storage.getStorageDir().getLogSegmentFiles();
+ // getLogSegmentPaths should remove it.
+ final List<LogSegmentPath> logs = LogSegmentPath.getLogSegmentPaths(storage);
Assert.assertEquals(0, logs.size());
Assert.assertFalse(Files.exists(path));
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index f543ed0..88b5e2f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -103,7 +103,7 @@ public class TestRaftLogReadWrite extends BaseTest {
@Test
public void testReadWriteLog() throws IOException {
final RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
+ final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
long size = SegmentedRaftLogFormat.getHeaderLength();
final LogEntryProto[] entries = new LogEntryProto[100];
@@ -123,7 +123,7 @@ public class TestRaftLogReadWrite extends BaseTest {
@Test
public void testAppendLog() throws IOException {
final RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
+ final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
LogEntryProto[] entries = new LogEntryProto[200];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
@@ -156,7 +156,7 @@ public class TestRaftLogReadWrite extends BaseTest {
@Test
public void testReadWithPadding() throws IOException {
final RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
+ final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
long size = SegmentedRaftLogFormat.getHeaderLength();
LogEntryProto[] entries = new LogEntryProto[100];
@@ -185,7 +185,7 @@ public class TestRaftLogReadWrite extends BaseTest {
@Test
public void testReadWithCorruptPadding() throws IOException {
final RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
+ final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
LogEntryProto[] entries = new LogEntryProto[10];
final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
@@ -234,7 +234,7 @@ public class TestRaftLogReadWrite extends BaseTest {
@Test
public void testReadWithEntryCorruption() throws IOException {
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
+ final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (int i = 0; i < 100; i++) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 8b4b77d..b150e38 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -95,6 +95,10 @@ public class TestSegmentedRaftLog extends BaseTest {
this.term = term;
this.isOpen = isOpen;
}
+
+ File getFile(RaftStorage storage) {
+ return LogSegmentStartEnd.valueOf(start, end, isOpen).getFile(storage);
+ }
}
private File storageDir;
@@ -147,10 +151,7 @@ public class TestSegmentedRaftLog extends BaseTest {
private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException {
List<LogEntryProto> entryList = new ArrayList<>();
for (SegmentRange range : list) {
- File file = range.isOpen ?
- storage.getStorageDir().getOpenLogFile(range.start) :
- storage.getStorageDir().getClosedLogFile(range.start, range.end);
-
+ final File file = range.getFile(storage);
final int size = (int) (range.end - range.start + 1);
LogEntryProto[] entries = new LogEntryProto[size];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
index e6bc176..250a479 100644
--- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
+++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
@@ -22,8 +22,8 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
import java.io.File;
import java.io.IOException;
@@ -49,14 +49,14 @@ public final class ParseRatisLog {
}
public void dumpSegmentFile() throws IOException {
- RaftStorageDirectory.LogPathAndIndex pi = RaftStorageDirectory.processOnePath(file.toPath());
+ final LogSegmentPath pi = LogSegmentPath.matchLogSegment(file.toPath());
if (pi == null) {
System.out.println("Invalid segment file");
return;
}
System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length());
- final int entryCount = LogSegment.readSegmentFile(file, pi.getStartIndex(), pi.getEndIndex(), pi.isOpen(),
+ final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(),
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry);
System.out.println("Num Total Entries: " + entryCount);
System.out.println("Num Conf Entries: " + numConfEntries);