You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/21 05:29:27 UTC
[incubator-ratis] branch master updated: RATIS-1255. Move RaftLog
to raft-server-api. (#367)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c714aff RATIS-1255. Move RaftLog to raft-server-api. (#367)
c714aff is described below
commit c714affca4da0637b877a53bc21e21f3583204ae
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 21 13:29:17 2020 +0800
RATIS-1255. Move RaftLog to raft-server-api. (#367)
---
.../org/apache/ratis/server/raftlog/RaftLog.java | 173 +++++++++++++++++++++
.../ratis/server/raftlog/RaftLogSequentialOps.java | 0
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +-
.../org/apache/ratis/server/impl/ServerState.java | 4 +-
.../raftlog/{RaftLog.java => RaftLogBase.java} | 147 +++--------------
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 10 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 12 +-
.../test/java/org/apache/ratis/RaftAsyncTests.java | 5 +-
.../test/java/org/apache/ratis/RaftTestUtil.java | 3 +-
.../server/impl/RaftReconfigurationBaseTest.java | 8 +-
.../ratis/server/impl/RaftServerTestUtil.java | 4 +
.../ratis/server/storage/RaftStorageTestUtils.java | 4 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 5 +-
13 files changed, 223 insertions(+), 155 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
new file mode 100644
index 0000000..f84d010
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.raftlog;
+
+import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+/**
+ * {@link RaftLog} is a transaction log of a raft service.
+ */
+public interface RaftLog extends RaftLogSequentialOps, Closeable {
+ Logger LOG = LoggerFactory.getLogger(RaftLog.class);
+
+ /** The least valid log index, i.e. the index used when writing to an empty log. */
+ long LEAST_VALID_LOG_INDEX = 0L;
+ /** Invalid log index is used to indicate that the log index is missing. */
+ long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
+
+ /** Does this log contains the given {@link TermIndex}? */
+ default boolean contains(TermIndex ti) {
+ Objects.requireNonNull(ti, "ti == null");
+ return ti.equals(getTermIndex(ti.getIndex()));
+ }
+
+ /**
+ * Is the entry corresponding to the given {@link TermIndex} a configuration entry?
+ * In other words, the corresponding entry exists and
+ * {@link LogEntryProto#hasConfigurationEntry()} returns true.
+ */
+ boolean isConfigEntry(TermIndex ti);
+
+ /**
+ * @return null if the log entry is not found in this log;
+ * otherwise, return the {@link TermIndex} of the log entry corresponding to the given index.
+ */
+ TermIndex getTermIndex(long index);
+
+ /**
+ * @return null if the log entry is not found in this log;
+ * otherwise, return the log entry corresponding to the given index.
+ */
+ LogEntryProto get(long index) throws RaftLogIOException;
+
+ /**
+ * @return null if the log entry is not found in this log;
+ * otherwise, return the {@link EntryWithData} corresponding to the given index.
+ */
+ EntryWithData getEntryWithData(long index) throws RaftLogIOException;
+
+ /**
+ * @param startIndex the starting log index (inclusive)
+ * @param endIndex the ending log index (exclusive)
+ * @return an array of {@link TermIndex} of all log entries within the given index range. Null if
+ * startIndex is greater than the smallest available index.
+ */
+ TermIndex[] getEntries(long startIndex, long endIndex);
+
+ /** @return the index of the starting entry of this log. */
+ long getStartIndex();
+
+ /** @return the index of the next entry to append. */
+ default long getNextIndex() {
+ final TermIndex last = getLastEntryTermIndex();
+ if (last == null) {
+ // if the log is empty, the last committed index should be consistent with
+ // the last index included in the latest snapshot.
+ return getLastCommittedIndex() + 1;
+ }
+ return last.getIndex() + 1;
+ }
+
+ /** @return the index of the last entry that has been committed. */
+ long getLastCommittedIndex();
+
+ /** @return the index of the latest snapshot. */
+ long getSnapshotIndex();
+
+ /** @return the index of the last entry that has been flushed to the local storage. */
+ long getFlushIndex();
+
+ /** @return the {@link TermIndex} of the last log entry. */
+ TermIndex getLastEntryTermIndex();
+
+ /** @return the {@link RaftLogMetrics}. */
+ RaftLogMetrics getRaftLogMetrics();
+
+ /**
+ * Update the commit index.
+ * @param majorityIndex the index that has achieved majority.
+ * @param currentTerm the current term.
+ * @param isLeader Is this server the leader?
+ * @return true if commit index is changed; otherwise, return false.
+ */
+ boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader);
+
+ /**
+ * Update the snapshot index with the given index.
+ * Note that the commit index may also be changed by this update.
+ */
+ void updateSnapshotIndex(long newSnapshotIndex);
+
+ /** Open this log for read and write. */
+ void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException;
+
+ /**
+ * Purge asynchronously the log transactions.
+ * The implementation may choose to purge an index other than the suggested index.
+ *
+ * @param suggestedIndex the suggested index (inclusive) to be purged.
+ * @return the future of the actual purged log index.
+ */
+ CompletableFuture<Long> purge(long suggestedIndex);
+
+ /** Persist the given metadata. */
+ void persistMetadata(RaftStorageMetadata metadata) throws IOException;
+
+ /** Load metadata. */
+ RaftStorageMetadata loadMetadata() throws IOException;
+
+ /**
+ * A snapshot is installed so that the indices and other information of this log must be updated.
+ * This log may also purge the outdated entries.
+ *
+ * @return the future of the actual purged log index (inclusive).
+ */
+ CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex);
+
+ /**
+ * Log entry with state machine data.
+ *
+ * When both {@link LogEntryProto#hasStateMachineLogEntry()} and
+ * {@link StateMachineLogEntryProto#hasStateMachineEntry()} are true,
+ * the {@link StateMachineEntryProto} is removed from the original {@link LogEntryProto}
+ * before appending to this log.
+ * The {@link StateMachineEntryProto} is stored by the state machine but not in this log.
+ * When reading the log entry, this class rebuilds the original {@link LogEntryProto}
+ * containing both the log entry and the state machine data.
+ */
+ interface EntryWithData {
+ /** @return the serialized size including both log entry and state machine data. */
+ int getSerializedSize();
+
+ /** @return the {@link LogEntryProto} containing both the log entry and the state machine data. */
+ LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException;
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
similarity index 100%
rename from ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
rename to ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8939f4f..6bc6499 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -93,6 +93,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+ static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
class Info implements DivisionInfo {
@Override
@@ -1178,7 +1179,7 @@ class RaftServerImpl implements RaftServer.Division,
commitInfos.forEach(commitInfoCache::update);
if (!isHeartbeat) {
- CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
+ CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
}
return JavaUtils.allOf(futures).whenCompleteAsync(
(r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index fe5e1f4..466eeb2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -222,7 +222,7 @@ class ServerState implements Closeable {
}
void persistMetadata() throws IOException {
- log.writeMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
+ log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
}
/**
@@ -413,7 +413,7 @@ class ServerState implements Closeable {
}
void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
- log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
+ log.onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
latestInstalledSnapshot.set(lastTermIndexInSnapshot);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
similarity index 76%
rename from ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
rename to ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 3f65b83..3e06963 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -21,11 +21,9 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftConfiguration;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
@@ -33,13 +31,9 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
@@ -55,10 +49,7 @@ import java.util.function.LongSupplier;
* 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
* in segments.
*/
-public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
- public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
- public static final String LOG_SYNC = JavaUtils.getClassSimpleName(RaftLog.class) + ".logSync";
-
+public abstract class RaftLogBase implements RaftLog {
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
@@ -87,7 +78,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
private volatile LogEntryProto lastMetadataEntry = null;
- protected RaftLog(RaftGroupMemberId memberId,
+ protected RaftLogBase(RaftGroupMemberId memberId,
LongSupplier getSnapshotIndexFromStateMachine,
RaftProperties properties) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
@@ -102,12 +93,12 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
this.getSnapshotIndexFromStateMachine = getSnapshotIndexFromStateMachine;
}
- public abstract RaftLogMetrics getRaftLogMetrics();
-
+ @Override
public long getLastCommittedIndex() {
return commitIndex.get();
}
+ @Override
public long getSnapshotIndex() {
return snapshotIndex.get();
}
@@ -116,17 +107,12 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
state.assertOpen();
}
+ /** Is this log already opened? */
public boolean isOpened() {
return state.isOpened();
}
- /**
- * Update the last committed index.
- * @param majorityIndex the index that has achieved majority.
- * @param currentTerm the current term.
- * @param isLeader Is this server the leader?
- * @return true if update is applied; otherwise, return false, i.e. no update required.
- */
+ @Override
public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldCommittedIndex = getLastCommittedIndex();
@@ -152,11 +138,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
updateSnapshotIndex(getSnapshotIndexFromStateMachine.getAsLong());
}
- /**
- * Update the last committed index and snapshotIndex with the last index in
- * the snapshot.
- * @param newSnapshotIndex the last index in the snapshot
- */
+ @Override
public void updateSnapshotIndex(long newSnapshotIndex) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldSnapshotIndex = getSnapshotIndex();
@@ -170,29 +152,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
}
- /**
- * Does the log contains the given term and index? Used to check the
- * consistency between the local log of a follower and the log entries sent
- * by the leader.
- */
- public boolean contains(TermIndex ti) {
- Objects.requireNonNull(ti, "ti == null");
- return ti.equals(getTermIndex(ti.getIndex()));
- }
-
- /**
- * @return the index of the next log entry to append.
- */
- public long getNextIndex() {
- final TermIndex last = getLastEntryTermIndex();
- if (last == null) {
- // if the log is empty, the last committed index should be consistent with
- // the last index included in the latest snapshot.
- return getLastCommittedIndex() + 1;
- }
- return last.getIndex() + 1;
- }
-
@Override
public final long append(long term, TransactionContext transaction) throws StateMachineException {
return runner.runSequentially(() -> appendImpl(term, transaction));
@@ -268,6 +227,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
return true;
}
+
@Override
public final long append(long term, RaftConfiguration configuration) {
return runner.runSequentially(() -> appendImpl(term, configuration));
@@ -282,6 +242,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
}
+ @Override
public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
openImpl(lastIndexInSnapshot, e -> {
if (e.hasMetadataEntry()) {
@@ -303,48 +264,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
}
- public abstract long getStartIndex();
-
- /**
- * Get the log entry of the given index.
- *
- * @param index The given index.
- * @return The log entry associated with the given index.
- * Null if there is no log entry with the index.
- */
- public abstract LogEntryProto get(long index) throws RaftLogIOException;
-
- /**
- * Get the log entry of the given index along with the state machine data.
- *
- * @param index The given index.
- * @return The log entry associated with the given index.
- * Null if there is no log entry with the index.
- */
- public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException;
-
- /**
- * Get the TermIndex information of the given index.
- *
- * @param index The given index.
- * @return The TermIndex of the log entry associated with the given index.
- * Null if there is no log entry with the index.
- */
- public abstract TermIndex getTermIndex(long index);
-
- /**
- * @param startIndex the starting log index (inclusive)
- * @param endIndex the ending log index (exclusive)
- * @return TermIndex of all log entries within the given index range. Null if
- * startIndex is greater than the smallest available index.
- */
- public abstract TermIndex[] getEntries(long startIndex, long endIndex);
-
- /**
- * @return the last log entry's term and index.
- */
- public abstract TermIndex getLastEntryTermIndex();
-
/**
* Validate the term and index of entry w.r.t RaftLog
*/
@@ -378,13 +297,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
protected abstract CompletableFuture<Long> truncateImpl(long index);
- /**
- * Purge asynchronously the log transactions.
- * The implementation may choose to purge an index other than the suggested index.
- *
- * @param suggestedIndex the suggested index (inclusive) to be purged.
- * @return the future of the actual purged log index.
- */
+ @Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
@@ -417,30 +330,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
- /**
- * @return the index of the last entry that has been flushed to the local storage.
- */
- public abstract long getFlushIndex();
-
- /**
- * Write and flush the metadata (votedFor and term) into the meta file.
- *
- * We need to guarantee that the order of writeMetadata calls is the same with
- * that when we change the in-memory term/votedFor. Otherwise we may persist
- * stale term/votedFor in file.
- *
- * Since the leader change is not frequent, currently we simply put this call
- * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
- * order.
- */
- public abstract void writeMetadata(RaftStorageMetadata metadata) throws IOException;
-
- public abstract RaftStorageMetadata loadMetadata() throws IOException;
-
- public abstract CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex);
-
- public abstract boolean isConfigEntry(TermIndex ti);
-
@Override
public String toString() {
return getName() + ":" + state + ":c" + getLastCommittedIndex();
@@ -471,26 +360,28 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
return name;
}
+ protected EntryWithData newEntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
+ return new EntryWithDataImpl(logEntry, future);
+ }
+
/**
* Holds proto entry along with future which contains read state machine data
*/
- public class EntryWithData {
+ class EntryWithDataImpl implements EntryWithData {
private final LogEntryProto logEntry;
private final CompletableFuture<ByteString> future;
- public EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
+ EntryWithDataImpl(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
this.logEntry = logEntry;
this.future = future;
}
- public long getIndex() {
- return logEntry.getIndex();
- }
-
+ @Override
public int getSerializedSize() {
return LogProtoUtils.getSerializedSize(logEntry);
}
+ @Override
public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
LogEntryProto entryProto;
if (future == null) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 9dbf628..7fa91bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -22,7 +22,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
@@ -37,7 +37,7 @@ import java.util.function.LongSupplier;
/**
* A simple RaftLog implementation in memory. Used only for testing.
*/
-public class MemoryRaftLog extends RaftLog {
+public class MemoryRaftLog extends RaftLogBase {
static class EntryList {
private final List<LogEntryProto> entries = new ArrayList<>();
@@ -96,7 +96,7 @@ public class MemoryRaftLog extends RaftLog {
@Override
public EntryWithData getEntryWithData(long index) {
- return new EntryWithData(get(index), null);
+ return newEntryWithData(get(index), null);
}
@Override
@@ -217,7 +217,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public void writeMetadata(RaftStorageMetadata newMetadata) {
+ public void persistMetadata(RaftStorageMetadata newMetadata) {
metadata.set(newMetadata);
}
@@ -227,7 +227,7 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+ public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
return CompletableFuture.completedFuture(lastSnapshotIndex);
// do nothing
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 070c5fa..1142bcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -24,7 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.server.storage.RaftStorage;
@@ -76,7 +76,7 @@ import com.codahale.metrics.Timer;
* in segments should be no smaller than the last index of snapshot, otherwise
* we may have hole when append further log.
*/
-public class SegmentedRaftLog extends RaftLog {
+public class SegmentedRaftLog extends RaftLogBase {
/**
* I/O task definitions.
*/
@@ -286,7 +286,7 @@ public class SegmentedRaftLog extends RaftLog {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
- return new EntryWithData(entry, null);
+ return newEntryWithData(entry, null);
}
try {
@@ -297,7 +297,7 @@ public class SegmentedRaftLog extends RaftLog {
return null;
});
}
- return new EntryWithData(entry, future);
+ return newEntryWithData(entry, future);
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
LogProtoUtils.toLogEntryString(entry);
@@ -465,7 +465,7 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public void writeMetadata(RaftStorageMetadata metadata) throws IOException {
+ public void persistMetadata(RaftStorageMetadata metadata) throws IOException {
storage.getMetadataFile().persist(metadata);
}
@@ -475,7 +475,7 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex) {
+ public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
// TODO purge normal/tmp/corrupt snapshot files
// if the last index in snapshot is larger than the index of the last
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f614a9f..598faa3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -39,7 +39,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -75,8 +75,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
public static final int NUM_SERVERS = 3;
- private static final DelayLocalExecutionInjection logSyncDelay =
- new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+ private static final DelayLocalExecutionInjection logSyncDelay = RaftServerTestUtil.getLogSyncDelay();
{
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 7abddf3..6cfe89f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
@@ -484,7 +485,7 @@ public interface RaftTestUtil {
}
static LogEntryProto getLastEntry(LogEntryBodyCase targetCase, RaftLog raftLog) throws Exception {
- try(AutoCloseableLock readLock = raftLog.readLock()) {
+ try(AutoCloseableLock readLock = ((RaftLogBase)raftLog).readLock()) {
long i = raftLog.getNextIndex() - 1;
for(; i >= 0; i--) {
final LogEntryProto entry = raftLog.get(i);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 8ee057e..6bd11f7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
@@ -69,8 +70,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
}
- private static final DelayLocalExecutionInjection logSyncDelay =
- new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+ private static final DelayLocalExecutionInjection logSyncDelay = RaftServerTestUtil.getLogSyncDelay();
private static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
new DelayLocalExecutionInjection(LeaderStateImpl.APPEND_PLACEHOLDER);
@@ -573,7 +573,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
}
void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception {
- RaftLog log2 = null;
+ RaftLogBase log2 = null;
try {
RaftTestUtil.waitForLeader(cluster);
@@ -581,7 +581,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
final RaftPeerId leaderId = leader.getId();
final RaftLog log = leader.getRaftLog();
- log2 = log;
+ log2 = (RaftLogBase) log;
Thread.sleep(1000);
// we block the incoming msg for the leader and block its requests to
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index fc6bc5a..88238ca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -52,6 +52,10 @@ public class RaftServerTestUtil {
public static final RaftGroupMemberId TEST_MEMBER_ID = RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("test"), RaftGroupId.emptyGroupId());
+ public static DelayLocalExecutionInjection getLogSyncDelay() {
+ return new DelayLocalExecutionInjection(RaftServerImpl.LOG_SYNC);
+ }
+
public static void setStateMachineUpdaterLogLevel(Level level) {
Log4jUtils.setLogLevel(StateMachineUpdater.LOG, level);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index b498b29..9f192e4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -23,7 +23,7 @@ import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_
import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.AutoCloseableLock;
@@ -45,7 +45,7 @@ public interface RaftStorageTestUtils {
+ "." + memberId + "." + metricName;
}
- static void printLog(RaftLog log, Consumer<String> println) {
+ static void printLog(RaftLogBase log, Consumer<String> println) {
if (log == null) {
println.accept("log == null");
return;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fe97f3e..2bc208b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -31,7 +31,6 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.RetryCache;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -290,7 +289,7 @@ public class TestSegmentedRaftLog extends BaseTest {
assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex()
+ " (or snapshot index " + raftLog.getSnapshotIndex() + ") is greater than 1"));
- raftLog.syncWithSnapshot(raftLog.getLastEntryTermIndex().getIndex());
+ raftLog.onSnapshotInstalled(raftLog.getLastEntryTermIndex().getIndex());
try {
// append entry fails if there are no log entries && log's snapshotIndex + 1 < incoming log entry.
raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0))
@@ -324,7 +323,7 @@ public class TestSegmentedRaftLog extends BaseTest {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.subList(0, entries.size() - 1).stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
- raftLog.syncWithSnapshot(desiredSnapshotIndex);
+ raftLog.onSnapshotInstalled(desiredSnapshotIndex);
// Try appending last entry after snapshot + purge.
CompletableFuture<Long> appendEntryFuture =
raftLog.appendEntry(entries.get(entries.size() - 1));