You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/05 14:41:08 UTC

[incubator-ratis] branch master updated: RATIS-1131. Refactor the notifyXxx() methods in StateMachine. (#256)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1429bf1  RATIS-1131. Refactor the notifyXxx() methods in StateMachine. (#256)
1429bf1 is described below

commit 1429bf128a735102745737cdf0c456c24c2baace
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 5 22:41:02 2020 +0800

    RATIS-1131. Refactor the notifyXxx() methods in StateMachine. (#256)
---
 .../org/apache/ratis/server/impl/LeaderState.java  |   6 +-
 .../org/apache/ratis/server/impl/LogAppender.java  |   2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  22 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  |   6 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   2 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java |   2 +-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |   4 +-
 .../apache/ratis/statemachine/StateMachine.java    | 257 +++++++++++++--------
 .../ratis/statemachine/TransactionContext.java     |   3 +-
 .../ratis/statemachine/impl/BaseStateMachine.java  |  11 +-
 .../statemachine/SimpleStateMachine4Testing.java   |   9 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |   6 +-
 12 files changed, 187 insertions(+), 143 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index fc472dc..0791f83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -288,7 +288,7 @@ public class LeaderState {
     final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
       final Collection<TransactionContext> transactions = pendingRequests.sendNotLeaderResponses(nle, commitInfos);
-      server.getStateMachine().notifyNotLeader(transactions);
+      server.getStateMachine().leaderEvent().notifyNotLeader(transactions);
       watchRequests.failWatches(nle);
     } catch (IOException e) {
       LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
@@ -297,9 +297,7 @@ public class LeaderState {
     server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
-    if (pendingRequests != null) {
-      pendingRequests.close();
-    }
+    pendingRequests.close();
   }
 
   void notifySenders() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 89ea15e..a1743ca 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -563,7 +563,7 @@ public class LogAppender {
 
   protected void checkSlowness() {
     if (follower.isSlow()) {
-      server.getStateMachine().notifySlowness(server.getRoleInfoProto());
+      server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
     }
     leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer(),
         follower.getLastRpcResponseTime().elapsedTime().getDuration());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2277279..92fda73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -286,11 +286,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
    * @param deleteDirectory
    * @param renameDirectory
    */
-  public void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
+  void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
     final RaftStorageDirectory dir = state.getStorage().getStorageDir();
 
     /* Shutdown is triggered here inorder to avoid any locked files. */
     shutdown();
+    getStateMachine().event().notifyGroupRemove();
     if (deleteDirectory) {
       for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) {
         try {
@@ -454,7 +455,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup());
   }
 
-  public RoleInfoProto getRoleInfoProto() {
+  RoleInfoProto getRoleInfoProto() {
     RaftPeerRole currentRole = role.getCurrentRole();
     RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
         .setSelf(getPeer().getRaftPeerProto())
@@ -499,7 +500,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     role.shutdownFollowerState();
     setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
     if (state.shouldNotifyExtendedNoLeader()) {
-      stateMachine.notifyExtendedNoLeader(getRoleInfoProto());
+      stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
     }
     // start election
     role.startLeaderElection(this);
@@ -1286,8 +1287,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
         state.setRaftConf(newConfLogEntryProto);
         state.writeRaftConfiguration(newConfLogEntryProto);
-        stateMachine.notifyConfigurationChange(newConfLogEntryProto.getTerm(),
-            newConfLogEntryProto.getIndex(),
+        stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), newConfLogEntryProto.getIndex(),
             newConfLogEntryProto.getConfigurationEntry());
       }
       return reply;
@@ -1394,7 +1394,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
 
         try {
-          stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
+          stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
               .whenComplete((reply, exception) -> {
                 if (exception != null) {
                   LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
@@ -1511,14 +1511,14 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
   CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
     if (!next.hasStateMachineLogEntry()) {
-      stateMachine.notifyIndexUpdate(next.getTerm(), next.getIndex());
+      stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex());
     }
+
     if (next.hasConfigurationEntry()) {
       // the reply should have already been set. only need to record
       // the new conf in the metadata file and notify the StateMachine.
       state.writeRaftConfiguration(next);
-      stateMachine.notifyConfigurationChange(next.getTerm(), next.getIndex(),
-            next.getConfigurationEntry());
+      stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry());
     } else if (next.hasStateMachineLogEntry()) {
       // check whether there is a TransactionContext because we are the leader.
       TransactionContext trx = role.getLeaderState()
@@ -1533,9 +1533,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       trx = stateMachine.applyTransactionSerial(trx);
 
       try {
-        // TODO: This step can be parallelized
-        CompletableFuture<Message> stateMachineFuture =
-            stateMachine.applyTransaction(trx);
+        final CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx);
         return replyPendingRequest(next, stateMachineFuture);
       } catch (Exception e) {
         LOG.error("{}: applyTransaction failed for index:{} proto:{}",
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b1332e1..4924927 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -23,7 +23,6 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
@@ -54,7 +53,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -441,10 +439,8 @@ public class RaftServerProxy implements RaftServer {
           getId() + ": Group " + groupId + " not found."));
     }
     return f.thenApply(impl -> {
-      final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos();
       impl.groupRemove(deleteDirectory, renameDirectory);
-      impl.getStateMachine().notifyGroupRemove();
-      return new RaftClientReply(request, commitInfos);
+      return new RaftClientReply(request, impl.getCommitInfos());
     });
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 9f0ddbc..bb64866 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -251,7 +251,7 @@ public class ServerState implements Closeable {
         Timestamp previous = lastNoLeaderTime;
         lastNoLeaderTime = null;
         suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
-        server.getStateMachine().notifyLeaderChanged(getMemberId(), newLeaderId);
+        server.getStateMachine().event().notifyLeaderChanged(getMemberId(), newLeaderId);
       }
       LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
           getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b6ab4b3..100bde4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -303,7 +303,7 @@ public class SegmentedRaftLog extends RaftLog {
       CompletableFuture<ByteString> future = null;
       if (stateMachine != null) {
         future = stateMachine.data().read(entry).exceptionally(ex -> {
-          stateMachine.notifyLogFailed(ex, entry);
+          stateMachine.event().notifyLogFailed(ex, entry);
           return null;
         });
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 42f2ab1..35cf209 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -478,7 +478,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
     @Override
     void failed(IOException e) {
-      stateMachine.notifyLogFailed(e, entry);
+      stateMachine.event().notifyLogFailed(e, entry);
       super.failed(e);
     }
 
@@ -563,7 +563,7 @@ class SegmentedRaftLogWorker implements Runnable {
     @Override
     void failed(IOException e) {
       // not failed for a specific log entry, but an entire segment
-      stateMachine.notifyLogFailed(e, null);
+      stateMachine.event().notifyLogFailed(e, null);
       super.failed(e);
     }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2be7316..351137d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -54,6 +54,11 @@ public interface StateMachine extends Closeable {
   interface Registry extends Function<RaftGroupId, StateMachine> {
   }
 
+  /**
+   * An optional API for managing data outside the {@link org.apache.ratis.server.raftlog.RaftLog}.
+   * For data intensive applications, it can be more efficient to implement this API
+   * in order to support zero buffer coping and a light-weighted {@link org.apache.ratis.server.raftlog.RaftLog}.
+   */
   interface DataApi {
     /** A noop implementation of {@link DataApi}. */
     DataApi DEFAULT = new DataApi() {};
@@ -118,6 +123,114 @@ public interface StateMachine extends Closeable {
   }
 
   /**
+   * An optional API for event notifications.
+   */
+  interface EventApi {
+    /** A noop implementation of {@link EventApi}. */
+    EventApi DEFAULT = new EventApi() {};
+
+    /**
+     * Notify the {@link StateMachine} that a new leader has been elected.
+     * Note that the new leader can possibly be this server.
+     *
+     * @param groupMemberId The id of this server.
+     * @param newLeaderId The id of the new leader.
+     */
+    default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {}
+
+    /**
+     * Notify the {@link StateMachine} a term-index update event.
+     * This method will be invoked when a {@link RaftProtos.MetadataProto}
+     * or {@link RaftProtos.RaftConfigurationProto} is processed.
+     * For {@link RaftProtos.StateMachineLogEntryProto}, this method will not be invoked.
+     *
+     * @param term The term of the log entry
+     * @param index The index of the log entry
+     */
+    default void notifyTermIndexUpdated(long term, long index) {}
+
+    /**
+     * Notify the {@link StateMachine} a configuration change.
+     * This method will be invoked when a {@link RaftProtos.RaftConfigurationProto} is processed.
+     *
+     * @param term term of the current log entry
+     * @param index index which is being updated
+     * @param newRaftConfiguration new configuration
+     */
+    default void notifyConfigurationChanged(long term, long index, RaftConfigurationProto newRaftConfiguration) {}
+
+    /**
+     * Notify the {@link StateMachine} a group removal event.
+     * This method is invoked after all the pending transactions have been applied by the {@link StateMachine}.
+     */
+    default void notifyGroupRemove() {}
+
+    /**
+     * Notify the {@link StateMachine} that a log operation failed.
+     *
+     * @param cause The cause of the failure.
+     * @param failedEntry The failed log entry, if there is any.
+     */
+    default void notifyLogFailed(Throwable cause, LogEntryProto failedEntry) {}
+  }
+
+  /**
+   * An optional API for leader-only event notifications.
+   * The method in this interface will be invoked only when the server is the leader.
+   */
+  interface LeaderEventApi {
+    /** A noop implementation of {@link LeaderEventApi}. */
+    LeaderEventApi DEFAULT = new LeaderEventApi() {};
+
+    /**
+     * Notify the {@link StateMachine} that the given follower is slow.
+     * This notification is based on "raft.server.rpc.slowness.timeout".
+     *
+     * @param roleInfoProto information about the current node role and rpc delay information
+     *
+     * @see org.apache.ratis.server.RaftServerConfigKeys.Rpc#SLOWNESS_TIMEOUT_KEY
+     */
+    default void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {}
+
+    /**
+     * Notify {@link StateMachine} that this server is no longer the leader.
+     */
+    default void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {}
+  }
+
+  /**
+   * An optional API for follower-only event notifications.
+   * The method in this interface will be invoked only when the server is a follower.
+   */
+  interface FollowerEventApi {
+    /** A noop implementation of {@link FollowerEventApi}. */
+    FollowerEventApi DEFAULT = new FollowerEventApi() {};
+
+    /**
+     * Notify the {@link StateMachine} that there is no leader in the group for an extended period of time.
+     * This notification is based on "raft.server.notification.no-leader.timeout".
+     *
+     * @param roleInfoProto information about the current node role and rpc delay information
+     *
+     * @see org.apache.ratis.server.RaftServerConfigKeys.Notification#NO_LEADER_TIMEOUT_KEY
+     */
+    default void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {}
+
+    /**
+     * Notify the {@link StateMachine} that the leader has purged entries from its log.
+     * In order to catch up, the {@link StateMachine} has to install the latest snapshot asynchronously.
+     *
+     * @param roleInfoProto information about the current node role and rpc delay information.
+     * @param firstTermIndexInLog The term-index of the first append entry available in the leader's log.
+     * @return return the last term-index in the snapshot after the snapshot installation.
+     */
+    default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+        RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+      return CompletableFuture.completedFuture(null);
+    }
+  }
+
+  /**
    * For streaming state machine data.
    */
   interface DataStream {
@@ -138,21 +251,64 @@ public interface StateMachine extends Closeable {
   }
 
   /**
-   * Get the optional {@link DataApi} object.
+   * Get the {@link DataApi} object.
    *
-   * If this {@link StateMachine} chooses to support {@link DataApi},
+   * If this {@link StateMachine} chooses to support the optional {@link DataApi},
    * it may either implement {@link DataApi} directly or override this method to return a {@link DataApi} object.
-   *
    * Otherwise, this {@link StateMachine} does not support {@link DataApi}.
    * Then, this method returns the default noop {@link DataApi} object.
    *
-   * @return The optional {@link DataApi} object.
+   * @return The {@link DataApi} object.
    */
   default DataApi data() {
     return this instanceof DataApi? (DataApi)this : DataApi.DEFAULT;
   }
 
   /**
+   * Get the {@link EventApi} object.
+   *
+   * If this {@link StateMachine} chooses to support the optional {@link EventApi},
+   * it may either implement {@link EventApi} directly or override this method to return an {@link EventApi} object.
+   * Otherwise, this {@link StateMachine} does not support {@link EventApi}.
+   * Then, this method returns the default noop {@link EventApi} object.
+   *
+   * @return The {@link EventApi} object.
+   */
+  default EventApi event() {
+    return this instanceof EventApi ? (EventApi)this : EventApi.DEFAULT;
+  }
+
+  /**
+   * Get the {@link LeaderEventApi} object.
+   *
+   * If this {@link StateMachine} chooses to support the optional {@link LeaderEventApi},
+   * it may either implement {@link LeaderEventApi} directly
+   * or override this method to return an {@link LeaderEventApi} object.
+   * Otherwise, this {@link StateMachine} does not support {@link LeaderEventApi}.
+   * Then, this method returns the default noop {@link LeaderEventApi} object.
+   *
+   * @return The {@link LeaderEventApi} object.
+   */
+  default LeaderEventApi leaderEvent() {
+    return this instanceof LeaderEventApi? (LeaderEventApi)this : LeaderEventApi.DEFAULT;
+  }
+
+  /**
+   * Get the {@link FollowerEventApi} object.
+   *
+   * If this {@link StateMachine} chooses to support the optional {@link FollowerEventApi},
+   * it may either implement {@link FollowerEventApi} directly
+   * or override this method to return an {@link FollowerEventApi} object.
+   * Otherwise, this {@link StateMachine} does not support {@link FollowerEventApi}.
+   * Then, this method returns the default noop {@link FollowerEventApi} object.
+   *
+   * @return The {@link LeaderEventApi} object.
+   */
+  default FollowerEventApi followerEvent() {
+    return this instanceof FollowerEventApi? (FollowerEventApi)this : FollowerEventApi.DEFAULT;
+  }
+
+  /**
    * Initializes the State Machine with the given server, group and storage. The state machine is
    * responsible reading the latest snapshot from the file system (if any) and initialize itself
    * with the latest term and index there including all the edits.
@@ -264,109 +420,18 @@ public interface StateMachine extends Closeable {
   TransactionContext applyTransactionSerial(TransactionContext trx);
 
   /**
-   * Called to notify state machine about indexes which are processed
-   * internally by Raft Server, this currently happens when metadata entries are
-   * processed in raft Server. This keep state machine to keep a track of index
-   * updates.
-   * @param term term of the current log entry
-   * @param index index which is being updated
-   */
-  default void notifyIndexUpdate(long term, long index) {
-
-  }
-
-  /**
-   * Called to notify state machine about configuration changes to the Raft
-   * Server. This currently happens when conf entries are processed in raft
-   * server. This allows state machine to keep track of configuration changes
-   * on the raft server and also track the index updates corresponding to
-   * configuration changes.
-   * @param term term of the current log entry
-   * @param index index which is being updated
-   * @param newRaftConfiguration new configuration
-   */
-  default void notifyConfigurationChange(long term, long index,
-      RaftConfigurationProto newRaftConfiguration) {
-  }
-
-  /**
    * Apply a committed log entry to the state machine. This method can be called concurrently with
    * the other calls, and there is no guarantee that the calls will be ordered according to the
    * log commit order.
    * @param trx the transaction state including the log entry that has been committed to a quorum
    *            of the raft peers
    */
-  // TODO: We do not need to return CompletableFuture
   CompletableFuture<Message> applyTransaction(TransactionContext trx);
 
+  /** @return the last term-index applied by this {@link StateMachine}. */
   TermIndex getLastAppliedTermIndex();
 
   /**
-   * Notify the state machine that the raft peer is no longer leader.
-   */
-  void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
-
-  /**
-   * Notify the Leader's state machine that one of the followers is slow
-   * this notification is based on "raft.server.rpc.slowness.timeout"
-   *
-   * @param roleInfoProto information about the current node role and rpc delay information
-   */
-  default void notifySlowness(RoleInfoProto roleInfoProto) {
-
-  }
-
-  /**
-   * Notify the state machine that the pipeline has failed.
-   * This notification is triggered when a log operation throws an Exception.
-   * @param t Exception which was caught, indicates possible cause.
-   * @param failedEntry if append failed for a specific entry, null otherwise.
-   */
-  default void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
-
-  }
-
-  /**
-   * Notify the Leader's state machine that a leader has not been elected for a long time
-   * this notification is based on "raft.server.leader.election.timeout"
-   *
-   * @param roleInfoProto information about the current node role and rpc delay information
-   */
-  default void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
-
-  }
-
-  /**
-   * Notify the Follower's state machine that the leader has purged entries
-   * from its log and hence to catch up, the Follower state machine would have
-   * to install the latest snapshot.
-   * @param firstTermIndexInLog TermIndex of the first append entry available
-   *                           in the Leader's log.
-   * @param roleInfoProto information about the current node role and
-   *                            rpc delay information
-   * @return After the snapshot installation is complete, return the last
-   * included term index in the snapshot.
-   */
-  default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
-      RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
-    return CompletableFuture.completedFuture(null);
-  }
-
-  /**
-   * Notify the state machine that a RaftPeer has been elected as leader.
-   */
-  default void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) {
-  }
-
-  /**
-   * Notify about group removal in the state machine. This function is called
-   * during group removal after all the pending transactions have been applied
-   * by the state machine.
-   */
-  default void notifyGroupRemove() {
-  }
-
-  /**
    * Converts the proto object into a useful log string to add information about state machine data.
    * @param proto state machine proto
    * @return the string representation of the proto.
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index 76e8509..17738e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -40,8 +40,7 @@ import java.util.Objects;
  * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
  * a {@link TransactionContext} with the changes from the {@link StateMachine}.
  * The same context will be passed back to the {@link StateMachine}
- * via the {@link StateMachine#applyTransaction(TransactionContext)} call
- * or the {@link StateMachine#notifyNotLeader(java.util.Collection)} call.
+ * via the {@link StateMachine#applyTransaction(TransactionContext)} call.
  *
  * In the second case, the {@link StateMachine} is a follower.
  * The {@link TransactionContext} will be a committed entry coming from
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 6032ee3..4f0de53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -36,7 +36,6 @@ import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -46,7 +45,8 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Base implementation for StateMachines.
  */
-public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
+public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
+    StateMachine.EventApi, StateMachine.LeaderEventApi, StateMachine.FollowerEventApi {
   private final CompletableFuture<RaftServer> server = new CompletableFuture<>();
   private volatile RaftGroupId groupId;
   private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
@@ -93,11 +93,6 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
   }
 
   @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
-    // do nothing
-  }
-
-  @Override
   public void pause() {
   }
 
@@ -129,7 +124,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi {
   }
 
   @Override
-  public void notifyIndexUpdate(long term, long index) {
+  public void notifyTermIndexUpdated(long term, long index) {
     updateLastAppliedTermIndex(term, index);
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 5653bd7..fcad81e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -52,7 +52,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Objects;
@@ -418,7 +417,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   }
 
   @Override
-  public void notifySlowness(RoleInfoProto roleInfoProto) {
+  public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
     LOG.info("{}: notifySlowness {}, {}", this, groupId, roleInfoProto);
     slownessInfo = roleInfoProto;
   }
@@ -430,12 +429,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   }
 
   @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
-      throws IOException {
-
-  }
-
-  @Override
   public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) {
     if (groupMemberId.getPeerId().equals(raftPeerId)) {
       notifiedAsLeader = true;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fddcb04..ce81bb9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -560,10 +560,10 @@ public class TestSegmentedRaftLog extends BaseTest {
       }
 
       @Override
-      public void notifyLogFailed(Throwable t, LogEntryProto entry) {
-        LOG.info("Test StateMachine : Ratis log failed notification received, "
-            + "as expected. Transition to PAUSED state.");
+      public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
+        LOG.info("Test StateMachine: Ratis log failed notification received as expected.", cause);
 
+        LOG.info("Test StateMachine: Transition to PAUSED state.");
         Assert.assertNotNull(entry);
 
         getLifeCycle().transition(LifeCycle.State.PAUSING);