You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/05 14:41:08 UTC
[incubator-ratis] branch master updated: RATIS-1131. Refactor the
notifyXxx() methods in StateMachine. (#256)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1429bf1 RATIS-1131. Refactor the notifyXxx() methods in StateMachine. (#256)
1429bf1 is described below
commit 1429bf128a735102745737cdf0c456c24c2baace
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 5 22:41:02 2020 +0800
RATIS-1131. Refactor the notifyXxx() methods in StateMachine. (#256)
---
.../org/apache/ratis/server/impl/LeaderState.java | 6 +-
.../org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 22 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 6 +-
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 2 +-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 4 +-
.../apache/ratis/statemachine/StateMachine.java | 257 +++++++++++++--------
.../ratis/statemachine/TransactionContext.java | 3 +-
.../ratis/statemachine/impl/BaseStateMachine.java | 11 +-
.../statemachine/SimpleStateMachine4Testing.java | 9 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 6 +-
12 files changed, 187 insertions(+), 143 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index fc472dc..0791f83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -288,7 +288,7 @@ public class LeaderState {
final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
try {
final Collection<TransactionContext> transactions = pendingRequests.sendNotLeaderResponses(nle, commitInfos);
- server.getStateMachine().notifyNotLeader(transactions);
+ server.getStateMachine().leaderEvent().notifyNotLeader(transactions);
watchRequests.failWatches(nle);
} catch (IOException e) {
LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
@@ -297,9 +297,7 @@ public class LeaderState {
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
- if (pendingRequests != null) {
- pendingRequests.close();
- }
+ pendingRequests.close();
}
void notifySenders() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 89ea15e..a1743ca 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -563,7 +563,7 @@ public class LogAppender {
protected void checkSlowness() {
if (follower.isSlow()) {
- server.getStateMachine().notifySlowness(server.getRoleInfoProto());
+ server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
}
leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer(),
follower.getLastRpcResponseTime().elapsedTime().getDuration());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2277279..92fda73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -286,11 +286,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
* @param deleteDirectory
* @param renameDirectory
*/
- public void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
+ void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
/* Shutdown is triggered here inorder to avoid any locked files. */
shutdown();
+ getStateMachine().event().notifyGroupRemove();
if (deleteDirectory) {
for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) {
try {
@@ -454,7 +455,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup());
}
- public RoleInfoProto getRoleInfoProto() {
+ RoleInfoProto getRoleInfoProto() {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
.setSelf(getPeer().getRaftPeerProto())
@@ -499,7 +500,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.shouldNotifyExtendedNoLeader()) {
- stateMachine.notifyExtendedNoLeader(getRoleInfoProto());
+ stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
}
// start election
role.startLeaderElection(this);
@@ -1286,8 +1287,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
state.setRaftConf(newConfLogEntryProto);
state.writeRaftConfiguration(newConfLogEntryProto);
- stateMachine.notifyConfigurationChange(newConfLogEntryProto.getTerm(),
- newConfLogEntryProto.getIndex(),
+ stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), newConfLogEntryProto.getIndex(),
newConfLogEntryProto.getConfigurationEntry());
}
return reply;
@@ -1394,7 +1394,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
try {
- stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
+ stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
@@ -1511,14 +1511,14 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
if (!next.hasStateMachineLogEntry()) {
- stateMachine.notifyIndexUpdate(next.getTerm(), next.getIndex());
+ stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex());
}
+
if (next.hasConfigurationEntry()) {
// the reply should have already been set. only need to record
// the new conf in the metadata file and notify the StateMachine.
state.writeRaftConfiguration(next);
- stateMachine.notifyConfigurationChange(next.getTerm(), next.getIndex(),
- next.getConfigurationEntry());
+ stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry());
} else if (next.hasStateMachineLogEntry()) {
// check whether there is a TransactionContext because we are the leader.
TransactionContext trx = role.getLeaderState()
@@ -1533,9 +1533,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
trx = stateMachine.applyTransactionSerial(trx);
try {
- // TODO: This step can be parallelized
- CompletableFuture<Message> stateMachineFuture =
- stateMachine.applyTransaction(trx);
+ final CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx);
return replyPendingRequest(next, stateMachineFuture);
} catch (Exception e) {
LOG.error("{}: applyTransaction failed for index:{} proto:{}",
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b1332e1..4924927 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -23,7 +23,6 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
@@ -54,7 +53,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -441,10 +439,8 @@ public class RaftServerProxy implements RaftServer {
getId() + ": Group " + groupId + " not found."));
}
return f.thenApply(impl -> {
- final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos();
impl.groupRemove(deleteDirectory, renameDirectory);
- impl.getStateMachine().notifyGroupRemove();
- return new RaftClientReply(request, commitInfos);
+ return new RaftClientReply(request, impl.getCommitInfos());
});
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 9f0ddbc..bb64866 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -251,7 +251,7 @@ public class ServerState implements Closeable {
Timestamp previous = lastNoLeaderTime;
lastNoLeaderTime = null;
suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
- server.getStateMachine().notifyLeaderChanged(getMemberId(), newLeaderId);
+ server.getStateMachine().event().notifyLeaderChanged(getMemberId(), newLeaderId);
}
LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b6ab4b3..100bde4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -303,7 +303,7 @@ public class SegmentedRaftLog extends RaftLog {
CompletableFuture<ByteString> future = null;
if (stateMachine != null) {
future = stateMachine.data().read(entry).exceptionally(ex -> {
- stateMachine.notifyLogFailed(ex, entry);
+ stateMachine.event().notifyLogFailed(ex, entry);
return null;
});
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 42f2ab1..35cf209 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -478,7 +478,7 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
void failed(IOException e) {
- stateMachine.notifyLogFailed(e, entry);
+ stateMachine.event().notifyLogFailed(e, entry);
super.failed(e);
}
@@ -563,7 +563,7 @@ class SegmentedRaftLogWorker implements Runnable {
@Override
void failed(IOException e) {
// not failed for a specific log entry, but an entire segment
- stateMachine.notifyLogFailed(e, null);
+ stateMachine.event().notifyLogFailed(e, null);
super.failed(e);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2be7316..351137d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -54,6 +54,11 @@ public interface StateMachine extends Closeable {
interface Registry extends Function<RaftGroupId, StateMachine> {
}
+ /**
+ * An optional API for managing data outside the {@link org.apache.ratis.server.raftlog.RaftLog}.
+ * For data intensive applications, it can be more efficient to implement this API
+ * in order to support zero buffer coping and a light-weighted {@link org.apache.ratis.server.raftlog.RaftLog}.
+ */
interface DataApi {
/** A noop implementation of {@link DataApi}. */
DataApi DEFAULT = new DataApi() {};
@@ -118,6 +123,114 @@ public interface StateMachine extends Closeable {
}
/**
+ * An optional API for event notifications.
+ */
+ interface EventApi {
+ /** A noop implementation of {@link EventApi}. */
+ EventApi DEFAULT = new EventApi() {};
+
+ /**
+ * Notify the {@link StateMachine} that a new leader has been elected.
+ * Note that the new leader can possibly be this server.
+ *
+ * @param groupMemberId The id of this server.
+ * @param newLeaderId The id of the new leader.
+ */
+ default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {}
+
+ /**
+ * Notify the {@link StateMachine} a term-index update event.
+ * This method will be invoked when a {@link RaftProtos.MetadataProto}
+ * or {@link RaftProtos.RaftConfigurationProto} is processed.
+ * For {@link RaftProtos.StateMachineLogEntryProto}, this method will not be invoked.
+ *
+ * @param term The term of the log entry
+ * @param index The index of the log entry
+ */
+ default void notifyTermIndexUpdated(long term, long index) {}
+
+ /**
+ * Notify the {@link StateMachine} a configuration change.
+ * This method will be invoked when a {@link RaftProtos.RaftConfigurationProto} is processed.
+ *
+ * @param term term of the current log entry
+ * @param index index which is being updated
+ * @param newRaftConfiguration new configuration
+ */
+ default void notifyConfigurationChanged(long term, long index, RaftConfigurationProto newRaftConfiguration) {}
+
+ /**
+ * Notify the {@link StateMachine} a group removal event.
+ * This method is invoked after all the pending transactions have been applied by the {@link StateMachine}.
+ */
+ default void notifyGroupRemove() {}
+
+ /**
+ * Notify the {@link StateMachine} that a log operation failed.
+ *
+ * @param cause The cause of the failure.
+ * @param failedEntry The failed log entry, if there is any.
+ */
+ default void notifyLogFailed(Throwable cause, LogEntryProto failedEntry) {}
+ }
+
+ /**
+ * An optional API for leader-only event notifications.
+ * The method in this interface will be invoked only when the server is the leader.
+ */
+ interface LeaderEventApi {
+ /** A noop implementation of {@link LeaderEventApi}. */
+ LeaderEventApi DEFAULT = new LeaderEventApi() {};
+
+ /**
+ * Notify the {@link StateMachine} that the given follower is slow.
+ * This notification is based on "raft.server.rpc.slowness.timeout".
+ *
+ * @param roleInfoProto information about the current node role and rpc delay information
+ *
+ * @see org.apache.ratis.server.RaftServerConfigKeys.Rpc#SLOWNESS_TIMEOUT_KEY
+ */
+ default void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {}
+
+ /**
+ * Notify {@link StateMachine} that this server is no longer the leader.
+ */
+ default void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {}
+ }
+
+ /**
+ * An optional API for follower-only event notifications.
+ * The method in this interface will be invoked only when the server is a follower.
+ */
+ interface FollowerEventApi {
+ /** A noop implementation of {@link FollowerEventApi}. */
+ FollowerEventApi DEFAULT = new FollowerEventApi() {};
+
+ /**
+ * Notify the {@link StateMachine} that there is no leader in the group for an extended period of time.
+ * This notification is based on "raft.server.notification.no-leader.timeout".
+ *
+ * @param roleInfoProto information about the current node role and rpc delay information
+ *
+ * @see org.apache.ratis.server.RaftServerConfigKeys.Notification#NO_LEADER_TIMEOUT_KEY
+ */
+ default void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {}
+
+ /**
+ * Notify the {@link StateMachine} that the leader has purged entries from its log.
+ * In order to catch up, the {@link StateMachine} has to install the latest snapshot asynchronously.
+ *
+ * @param roleInfoProto information about the current node role and rpc delay information.
+ * @param firstTermIndexInLog The term-index of the first append entry available in the leader's log.
+ * @return return the last term-index in the snapshot after the snapshot installation.
+ */
+ default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+ RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /**
* For streaming state machine data.
*/
interface DataStream {
@@ -138,21 +251,64 @@ public interface StateMachine extends Closeable {
}
/**
- * Get the optional {@link DataApi} object.
+ * Get the {@link DataApi} object.
*
- * If this {@link StateMachine} chooses to support {@link DataApi},
+ * If this {@link StateMachine} chooses to support the optional {@link DataApi},
* it may either implement {@link DataApi} directly or override this method to return a {@link DataApi} object.
- *
* Otherwise, this {@link StateMachine} does not support {@link DataApi}.
* Then, this method returns the default noop {@link DataApi} object.
*
- * @return The optional {@link DataApi} object.
+ * @return The {@link DataApi} object.
*/
default DataApi data() {
return this instanceof DataApi? (DataApi)this : DataApi.DEFAULT;
}
/**
+ * Get the {@link EventApi} object.
+ *
+ * If this {@link StateMachine} chooses to support the optional {@link EventApi},
+ * it may either implement {@link EventApi} directly or override this method to return an {@link EventApi} object.
+ * Otherwise, this {@link StateMachine} does not support {@link EventApi}.
+ * Then, this method returns the default noop {@link EventApi} object.
+ *
+ * @return The {@link EventApi} object.
+ */
+ default EventApi event() {
+ return this instanceof EventApi ? (EventApi)this : EventApi.DEFAULT;
+ }
+
+ /**
+ * Get the {@link LeaderEventApi} object.
+ *
+ * If this {@link StateMachine} chooses to support the optional {@link LeaderEventApi},
+ * it may either implement {@link LeaderEventApi} directly
+ * or override this method to return an {@link LeaderEventApi} object.
+ * Otherwise, this {@link StateMachine} does not support {@link LeaderEventApi}.
+ * Then, this method returns the default noop {@link LeaderEventApi} object.
+ *
+ * @return The {@link LeaderEventApi} object.
+ */
+ default LeaderEventApi leaderEvent() {
+ return this instanceof LeaderEventApi? (LeaderEventApi)this : LeaderEventApi.DEFAULT;
+ }
+
+ /**
+ * Get the {@link FollowerEventApi} object.
+ *
+ * If this {@link StateMachine} chooses to support the optional {@link FollowerEventApi},
+ * it may either implement {@link FollowerEventApi} directly
+ * or override this method to return an {@link FollowerEventApi} object.
+ * Otherwise, this {@link StateMachine} does not support {@link FollowerEventApi}.
+ * Then, this method returns the default noop {@link FollowerEventApi} object.
+ *
+ * @return The {@link LeaderEventApi} object.
+ */
+ default FollowerEventApi followerEvent() {
+ return this instanceof FollowerEventApi? (FollowerEventApi)this : FollowerEventApi.DEFAULT;
+ }
+
+ /**
* Initializes the State Machine with the given server, group and storage. The state machine is
* responsible reading the latest snapshot from the file system (if any) and initialize itself
* with the latest term and index there including all the edits.
@@ -264,109 +420,18 @@ public interface StateMachine extends Closeable {
TransactionContext applyTransactionSerial(TransactionContext trx);
/**
- * Called to notify state machine about indexes which are processed
- * internally by Raft Server, this currently happens when metadata entries are
- * processed in raft Server. This keep state machine to keep a track of index
- * updates.
- * @param term term of the current log entry
- * @param index index which is being updated
- */
- default void notifyIndexUpdate(long term, long index) {
-
- }
-
- /**
- * Called to notify state machine about configuration changes to the Raft
- * Server. This currently happens when conf entries are processed in raft
- * server. This allows state machine to keep track of configuration changes
- * on the raft server and also track the index updates corresponding to
- * configuration changes.
- * @param term term of the current log entry
- * @param index index which is being updated
- * @param newRaftConfiguration new configuration
- */
- default void notifyConfigurationChange(long term, long index,
- RaftConfigurationProto newRaftConfiguration) {
- }
-
- /**
* Apply a committed log entry to the state machine. This method can be called concurrently with
* the other calls, and there is no guarantee that the calls will be ordered according to the
* log commit order.
* @param trx the transaction state including the log entry that has been committed to a quorum
* of the raft peers
*/
- // TODO: We do not need to return CompletableFuture
CompletableFuture<Message> applyTransaction(TransactionContext trx);
+ /** @return the last term-index applied by this {@link StateMachine}. */
TermIndex getLastAppliedTermIndex();
/**
- * Notify the state machine that the raft peer is no longer leader.
- */
- void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
-
- /**
- * Notify the Leader's state machine that one of the followers is slow
- * this notification is based on "raft.server.rpc.slowness.timeout"
- *
- * @param roleInfoProto information about the current node role and rpc delay information
- */
- default void notifySlowness(RoleInfoProto roleInfoProto) {
-
- }
-
- /**
- * Notify the state machine that the pipeline has failed.
- * This notification is triggered when a log operation throws an Exception.
- * @param t Exception which was caught, indicates possible cause.
- * @param failedEntry if append failed for a specific entry, null otherwise.
- */
- default void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
-
- }
-
- /**
- * Notify the Leader's state machine that a leader has not been elected for a long time
- * this notification is based on "raft.server.leader.election.timeout"
- *
- * @param roleInfoProto information about the current node role and rpc delay information
- */
- default void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
-
- }
-
- /**
- * Notify the Follower's state machine that the leader has purged entries
- * from its log and hence to catch up, the Follower state machine would have
- * to install the latest snapshot.
- * @param firstTermIndexInLog TermIndex of the first append entry available
- * in the Leader's log.
- * @param roleInfoProto information about the current node role and
- * rpc delay information
- * @return After the snapshot installation is complete, return the last
- * included term index in the snapshot.
- */
- default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
- RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
- return CompletableFuture.completedFuture(null);
- }
-
- /**
- * Notify the state machine that a RaftPeer has been elected as leader.
- */
- default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) {
- }
-
- /**
- * Notify about group removal in the state machine. This function is called
- * during group removal after all the pending transactions have been applied
- * by the state machine.
- */
- default void notifyGroupRemove() {
- }
-
- /**
* Converts the proto object into a useful log string to add information about state machine data.
* @param proto state machine proto
* @return the string representation of the proto.
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index 76e8509..17738e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -40,8 +40,7 @@ import java.util.Objects;
* a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
* a {@link TransactionContext} with the changes from the {@link StateMachine}.
* The same context will be passed back to the {@link StateMachine}
- * via the {@link StateMachine#applyTransaction(TransactionContext)} call
- * or the {@link StateMachine#notifyNotLeader(java.util.Collection)} call.
+ * via the {@link StateMachine#applyTransaction(TransactionContext)} call.
*
* In the second case, the {@link StateMachine} is a follower.
* The {@link TransactionContext} will be a committed entry coming from
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 6032ee3..4f0de53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -36,7 +36,6 @@ import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import java.io.IOException;
-import java.util.Collection;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -46,7 +45,8 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Base implementation for StateMachines.
*/
-public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
+public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
+ StateMachine.EventApi, StateMachine.LeaderEventApi, StateMachine.FollowerEventApi {
private final CompletableFuture<RaftServer> server = new CompletableFuture<>();
private volatile RaftGroupId groupId;
private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
@@ -93,11 +93,6 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
}
@Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
- // do nothing
- }
-
- @Override
public void pause() {
}
@@ -129,7 +124,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
}
@Override
- public void notifyIndexUpdate(long term, long index) {
+ public void notifyTermIndexUpdated(long term, long index) {
updateLastAppliedTermIndex(term, index);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 5653bd7..fcad81e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -52,7 +52,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Objects;
@@ -418,7 +417,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
@Override
- public void notifySlowness(RoleInfoProto roleInfoProto) {
+ public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
LOG.info("{}: notifySlowness {}, {}", this, groupId, roleInfoProto);
slownessInfo = roleInfoProto;
}
@@ -430,12 +429,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
@Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
- throws IOException {
-
- }
-
- @Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) {
if (groupMemberId.getPeerId().equals(raftPeerId)) {
notifiedAsLeader = true;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fddcb04..ce81bb9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -560,10 +560,10 @@ public class TestSegmentedRaftLog extends BaseTest {
}
@Override
- public void notifyLogFailed(Throwable t, LogEntryProto entry) {
- LOG.info("Test StateMachine : Ratis log failed notification received, "
- + "as expected. Transition to PAUSED state.");
+ public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
+ LOG.info("Test StateMachine: Ratis log failed notification received as expected.", cause);
+ LOG.info("Test StateMachine: Transition to PAUSED state.");
Assert.assertNotNull(entry);
getLifeCycle().transition(LifeCycle.State.PAUSING);