You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/09 01:57:52 UTC

[ratis] branch master updated: RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1009a6c38 RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
1009a6c38 is described below

commit 1009a6c385373b3280cc9066e02ce050911627f8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Aug 8 18:57:47 2022 -0700

    RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
    
    * RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc.
    
    * Revert ArithmeticStateMachine.java
    
    * Add an option to use AsyncApi
    
    * Fix a long line
    
    * Minor changes in CounterClient.
    
    * Remove the use of Charset.
    
    * Revert some CounterServer change for a findbugs warning.
    
    * Fix a bug in CounterServer.main.
---
 ratis-docs/src/site/markdown/start/index.md        | 653 +++++++++++++--------
 .../apache/ratis/examples/common/Constants.java    |   5 +-
 .../ratis/examples/counter/CounterCommand.java     |  41 ++
 .../examples/counter/client/CounterClient.java     | 118 ++--
 .../examples/counter/server/CounterServer.java     |  59 +-
 .../counter/server/CounterStateMachine.java        | 205 ++++---
 .../apache/ratis/examples/counter/TestCounter.java |  10 +-
 .../ratis/statemachine/impl/BaseStateMachine.java  |   9 +-
 8 files changed, 679 insertions(+), 421 deletions(-)

diff --git a/ratis-docs/src/site/markdown/start/index.md b/ratis-docs/src/site/markdown/start/index.md
index 7aca21f9b..beaa519ea 100644
--- a/ratis-docs/src/site/markdown/start/index.md
+++ b/ratis-docs/src/site/markdown/start/index.md
@@ -18,23 +18,51 @@
 # Getting Started
 Let's get started to use Raft in your application.
 To demonstrate how to use Ratis,
-we'll implement a simple counter service,
-which maintains a counter value across a cluster.
-The client could send the following types of commands to the cluster:
+we implement a simple *Counter* service,
+which maintains a counter value across a raft group.
+Clients could send the following types of requests to the raft group:
 
-* `INCREMENT`: increase the counter value
-* `GET`: query the current value of the counter,
-we call such kind of commands as read-only commands
+* `INCREMENT`: increase the counter value by 1.
+This command will trigger a transaction to change the state.
+* `GET`: query the current value of the counter.
+This is a read-only command since it does not change the state.
 
-Note: The full source could be found at [Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
-This article is mainly intended to show the steps of integration of Ratis,
-if you wish to run this example or find more examples,
+We have the following `enum` for representing the supported commands.
+
+```java
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+  /** Increment the counter by 1. */
+  INCREMENT,
+  /** Get the counter value. */
+  GET;
+
+  private final Message message = Message.valueOf(name());
+
+  public Message getMessage() {
+    return message;
+  }
+
+  /** Does the given command string match this command? */
+  public boolean matches(String command) {
+    return name().equalsIgnoreCase(command);
+  }
+}
+```
+
+Note:
+The source code of the Counter example and the other examples is at
+[Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
+This article intends to show the steps of integration of Ratis.
+If you wish to run the Counter example
 please refer to [the README](https://github.com/apache/ratis/tree/master/ratis-examples#example-3-counter).
 
-## Add the dependency
+## Adding the Dependency
 
-First, we need to add Ratis dependencies into the project,
-it's available in maven central:
+The first step is to add Ratis dependencies into the project.
+The dependencies are available in maven central:
 
 ```xml
 <dependency>
@@ -43,13 +71,14 @@ it's available in maven central:
 </dependency>
 ```
 
-Also, one of the following transports need to be added:
+Then, add one of the following transports:
 
-* grpc
-* netty
-* hadoop
+* ratis-grpc
+* ratis-netty
+* ratis-hadoop
 
-For example, let's use grpc transport:
+In this example,
+we choose to use ratis-grpc:
 
 ```xml
 <dependency>
@@ -61,294 +90,436 @@ For example, let's use grpc transport:
 Please note that Apache Hadoop dependencies are shaded,
 so it’s safe to use hadoop transport with different versions of Hadoop.
 
-## Create the StateMachine
-A state machine is used to maintain the current state of the raft node,
-the state machine is responsible for:
-
-* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed, 
-the counter value will be increased by 1.
-And a `GET` log does not affect the state but only returns the current counter value to the client.
-* Managing snapshots loading/saving.
-Snapshots are used to speed the log execution,
-the state machine could start from a snapshot point and only execute newer logs.
-
-### Define the StateMachine
-To define our state machine,
-we can extend a class from the base class `BaseStateMachine`.
-
-Also, a storage is needed to store snapshots,
-and we'll use the build-in `SimpleStateMachineStorage`,
-which is a file-based storage implementation.
-
-Since we're going to implement a count server,
-the `counter` instance is defined in the state machine,
-represents the current value.
-Below is the declaration of the state machine: 
+## Implementing the `CounterStateMachine`
+A state machine manages the application logic.
+The state machine is responsible for:
+
+* Apply raft log transactions in order to maintain the current state.
+  * When there is an `INCREMENT` request,
+    it will first be written to the raft log as a log entry.
+    Once the log entry is committed,
+    the state machine will be invoked for applying the log entry as a transaction
+    so that the counter value will be increased by 1.
+  * When there is a `GET` request,
+    it will not be written to the raft log
+    since it is a readonly request which does not change the state.
+    The state machine should return the current value of the counter.
+* Manage snapshots loading/saving.
+  * Snapshots are used for log compaction
+    so that the state machine can be restored from a snapshot
+    and then applies only the newer log entries,
+    instead of applying a long history of log starting from the beginning.
+
+We discuss how to implement `CounterStateMachine` in the following subsections.
+The complete source code of it is in
+[CounterStateMachine.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java).
+
+### Defining the State
+In this example,
+the `CounterStateMachine` extends the `BaseStateMachine`,
+which provides a base implementation of a `StateMachine`.
+
+Inside the `CounterStateMachine`,
+there is a `counter` object
+which stores the current value.
+The `counter` is an `AtomicInteger`
+in order to support concurrent access.
+We use the build-in `SimpleStateMachineStorage`,
+which is a file-based storage implementation,
+as a storage for storing snapshots.
+The fields are shown below:
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    private final SimpleStateMachineStorage storage =
-            new SimpleStateMachineStorage();
-    private AtomicInteger counter = new AtomicInteger(0);
-    // ...
+  // ...
+
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final AtomicInteger counter = new AtomicInteger(0);
+
+  // ...
 }
 ```
 
-### Apply Raft Log Item
+### Applying Raft Log Entries
 
-Once the raft log is committed,
-Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method,
-and we need to override this method to decode the message and apply it. 
-
-First, get the log content and decode it:
+Once a raft log entry has been committed
+(i.e. a majority of the servers have acknowledged),
+Ratis notifies the state machine by invoking the `applyTransaction` method.
+The `applyTransaction` method first validates the log entry.
+Then, it applies the log entry by increasing the counter value and updates the term-index.
+The code fragments are shown below.
+Note that the `incrementCounter` method is synchronized
+in order to update both counter and last applied term-index atomically.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        final RaftProtos.LogEntryProto entry = trx.getLogEntry();
-        String logData = entry.getStateMachineLogEntry().getLogData()
-                .toString(Charset.defaultCharset());
-        if (!logData.equals("INCREMENT")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        // ...
+  // ...
+
+  private synchronized int incrementCounter(TermIndex termIndex) {
+    updateLastAppliedTermIndex(termIndex);
+    return counter.incrementAndGet();
+  }
+
+  // ...
+
+  /**
+   * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
+   *
+   * @param trx the transaction context
+   * @return the message containing the updated counter value
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final LogEntryProto entry = trx.getLogEntry();
+
+    //check if the command is valid
+    final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset());
+    if (!CounterCommand.INCREMENT.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    //increment the counter and update term-index
+    final TermIndex termIndex = TermIndex.valueOf(entry);
+    final long incremented = incrementCounter(termIndex);
+
+    //if leader, log the incremented value and the term-index
+    if (trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", termIndex, incremented);
+    }
+
+    //return the new value of the counter to the client
+    return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+  }
+
+  // ...
 }
 ```
 
-After that, if the log is valid,
-we could apply it by increasing the counter value.
-Remember that we also need to update the committed indexes:
+### Processing Readonly Commands
+The `INCREMENT` command is implemented in the previous section.
+What about the `GET` command?
+Since the `GET` command is a readonly command,
+it is implemented by the `query` method instead of the `applyTransaction` method.
+The code fragment is shown below.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        // ...
-        final long index = entry.getIndex();
-        updateLastAppliedTermIndex(entry.getTerm(), index);
-
-        //actual execution of the command: increment the counter
-        counter.incrementAndGet();
-
-        //return the new value of the counter to the client
-        final CompletableFuture<Message> f =
-                CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-        return f;
+  // ...
+
+  /**
+   * Process {@link CounterCommand#GET}, which gets the counter value.
+   *
+   * @param request the GET request
+   * @return a {@link Message} containing the current counter value as a {@link String}.
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    final String command = request.getContent().toString(Charset.defaultCharset());
+    if (!CounterCommand.GET.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+  }
+
+  // ...
 }
 ```
 
-### Handle Readonly Command
-Note that we only handled `INCREMENT` command,
-what about the `GET` command?
-The `GET` command is implemented as a readonly command,
-so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`.
+### Taking Snapshots
+When taking a snapshot,
+the state is persisted in the storage of the state machine.
+The snapshot can be loaded for restoring the state in the future.
+In this example,
+we use `ObjectOutputStream` to write the counter value to a snapshot file.
+The term-index is stored in the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `getState` method is synchronized
+in order to get the applied term-index and the counter value atomically.
+Note also that getting the counter value alone does not have to be synchronized
+since the `counter` field is already an `AtomicInteger`.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    @Override
-    public CompletableFuture<Message> query(Message request) {
-        String msg = request.getContent().toString(Charset.defaultCharset());
-        if (!msg.equals("GET")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        return CompletableFuture.completedFuture(
-                Message.valueOf(counter.toString()));
+  // ...
+
+  /** The state of the {@link CounterStateMachine}. */
+  static class CounterState {
+    private final TermIndex applied;
+    private final int counter;
+
+    CounterState(TermIndex applied, int counter) {
+      this.applied = applied;
+      this.counter = counter;
+    }
+
+    TermIndex getApplied() {
+      return applied;
+    }
+
+    int getCounter() {
+      return counter;
+    }
+  }
+
+  // ...
+
+  /** @return the current state. */
+  private synchronized CounterState getState() {
+    return new CounterState(getLastAppliedTermIndex(), counter.get());
+  }
+
+  // ...
+
+  /**
+   * Store the current state as a snapshot file in the {@link #storage}.
+   *
+   * @return the index of the snapshot
+   */
+  @Override
+  public long takeSnapshot() {
+    //get the current state
+    final CounterState state = getState();
+    final long index = state.getApplied().getIndex();
+
+    //create a file with a proper name to store the snapshot
+    final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
+
+    //write the counter value into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+        Files.newOutputStream(snapshotFile.toPath())))) {
+      out.writeInt(state.getCounter());
+    } catch (IOException ioe) {
+      LOG.warn("Failed to write snapshot file \"" + snapshotFile
+              + "\", last applied index=" + state.getApplied());
     }
+
+    //return the index of the stored snapshot (which is the last applied one)
+    return index;
+  }
+
+  // ...
 }
 ```
 
-### Save and Load Snapshots
-When taking a snapshot,
-we persist every state in the state machine,
-and the value could be loaded directly to the state in the future.
-In this example,
-the only state is the counter value,
-we're going to use `ObjectOutputStream` to write it to a snapshot file:
+### Loading Snapshots
+When loading a snapshot,
+we use an `ObjectInputStream` to read the snapshot file.
+The term-index is read from the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `updateState` method is synchronized
+in order to update the applied term-index and the counter value atomically.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    @Override
-    public long takeSnapshot() {
-        //get the last applied index
-        final TermIndex last = getLastAppliedTermIndex();
-
-        //create a file with a proper name to store the snapshot
-        final File snapshotFile =
-                storage.getSnapshotFile(last.getTerm(), last.getIndex());
-
-        //serialize the counter object and write it into the snapshot file
-        try (ObjectOutputStream out = new ObjectOutputStream(
-                new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-            out.writeObject(counter);
-        } catch (IOException ioe) {
-            LOG.warn("Failed to write snapshot file \"" + snapshotFile
-                    + "\", last applied index=" + last);
-        }
-
-        //return the index of the stored snapshot (which is the last applied one)
-        return last.getIndex();
+  // ...
+
+  private synchronized void updateState(TermIndex applied, int counterValue) {
+    updateLastAppliedTermIndex(applied);
+    counter.set(counterValue);
+  }
+
+  // ...
+
+  /**
+   * Load the state of the state machine from the {@link #storage}.
+   *
+   * @param snapshot the information of the snapshot being loaded
+   * @return the index of the snapshot or -1 if snapshot is invalid
+   * @throws IOException if it failed to read from storage
+   */
+  private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+    //check null
+    if (snapshot == null) {
+      LOG.warn("The snapshot info is null.");
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+    //check if the snapshot file exists.
+    final Path snapshotPath = snapshot.getFile().getPath();
+    if (!Files.exists(snapshotPath)) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
+      return RaftLog.INVALID_LOG_INDEX;
     }
+
+    //read the TermIndex from the snapshot file name
+    final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+    //read the counter value from the snapshot file
+    final int counterValue;
+    try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+      counterValue = in.readInt();
+    }
+
+    //update state
+    updateState(last, counterValue);
+
+    return last.getIndex();
+  }
+
+  // ...
 }
 ```
 
-When loading it,
-we could use `ObjectInputStream` to deserialize it.
-Remember that we also need to implement `initialize` and `reinitialize` method,
-so that the state machine will be correctly initialized.
-
-## Build and Start a RaftServer
-In order to build a raft cluster,
-each node must start a `RaftServer` instance,
-which is responsible for communicating to each other through Raft protocol.
+### Implementing the `initialize` and `reinitialize` methods.
+The `initialize` method is called at most once
+when the server is starting up.
+In contrast,
+the `reinitialize` method is called when
+1. the server is resumed from the `PAUSE` state, or
+2. a new snapshot is installed from the leader or from an external source.
+
+In `CounterStateMachine`,
+the `reinitialize` method simply loads the latest snapshot
+and the `initialize` method additionally initializes the `BaseStateMachine` super class and the storage.
+```java
+public class CounterStateMachine extends BaseStateMachine {
+  // ...
+
+  /**
+   * Initialize the state machine storage and then load the state.
+   *
+   * @param server  the server running this state machine
+   * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+   * @param raftStorage the storage of the server
+   * @throws IOException if it fails to load the state.
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
+    super.initialize(server, groupId, raftStorage);
+    storage.init(raftStorage);
+    reinitialize();
+  }
+
+  /**
+   * Simply load the state.
+   *
+   * @throws IOException if it fails to load the state.
+   */
+  @Override
+  public void reinitialize() throws IOException {
+    load(storage.getLatestSnapshot());
+  }
+
+  // ...
+}
+```
+## Preparing a `RaftGroup`
+In order to run a raft group,
+each server must start a `RaftServer` instance,
+which is responsible for communicating to each other through the Raft protocol.
 
 It's important to keep in mind that,
-each raft server knows exactly how many raft peers are in the cluster,
-and what are the addresses of them.
-In this example, we'll set a 3 node cluster.
+each raft server knows the initial raft group when starting up.
+They know the number of raft peers in the group
+and the addresses of the peers.
+
+In this example, we have a raft group with 3 peers.
 For simplicity,
-each peer listens to specific port on the same machine,
-and we can define the addresses of the cluster in a configuration file:
+each peer listens to a specific port on the same machine.
+The addresses of them are defined in a
+[property file](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/resources/conf.properties)
+as below.
 
 ```properties
 raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
 ```
 
-We name those peers as 'n-0', 'n-1' and 'n-2',
-and then we will create a `RaftGroup` instance representing them.
-Since they are immutable,
-we'll put them in the `Constant` class:
+The peers are named as 'n0', 'n1' and 'n2'
+and they form a `RaftGroup`.
+For more details, see
+[Constants.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java).
 
-```java
-public final class Constants {
-    public static final List<RaftPeer> PEERS;
-    private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
-
-    static {
-        // load addresses from configuration file
-        // final String[] addresses = ...
-        List<RaftPeer> peers = new ArrayList<>(addresses.length);
-        for (int i = 0; i < addresses.length; i++) {
-            peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
-        }
-        PEERS = Collections.unmodifiableList(peers);
-    }
-    
-    public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
-            RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
-    // ...
-}
-```
+## Building & Starting the `CounterServer`
 
-Except for the cluster info,
-another important thing is that we need to know the information of the current peer.
-To achieve this,
-we could pass the current peer's id as a program argument,
-and then the raft server could be created:
+We use a `RaftServer.Builder` to build a `RaftServer`.
+We first set up a `RaftProperties` object
+with a local directory as the storage of the server
+and a port number as the gRPC server port.
+Then,
+we create our `CounterStateMachine`
+and pass everything to the builder as below.
 
 ```java
 public final class CounterServer implements Closeable {
-    private final RaftServer server;
-
-    // the current peer will be passed as argument
-    public CounterServer(RaftPeer peer, File storageDir) throws IOException {
-        // ...
-        CounterStateMachine counterStateMachine = new CounterStateMachine();
-
-        //create and start the Raft server
-        this.server = RaftServer.newBuilder()
-                .setGroup(Constants.RAFT_GROUP)
-                .setProperties(properties)
-                .setServerId(peer.getId())
-                .setStateMachine(counterStateMachine)
-                .build();
-    }
+  private final RaftServer server;
 
-    public void start() throws IOException {
-        server.start();
-    }
-}
-```
+  public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+    //create a property object
+    final RaftProperties properties = new RaftProperties();
 
-Each `RaftServer` will own a `CounterStateMachine` instance,
-as previously defined by us.
-After that, all we need to do is to start it along with our application:
+    //set the storage directory (different for each peer) in the RaftProperty object
+    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
 
-```java
-public final class CounterServer implements Closeable {
-    // ...
-    public static void main(String[] args) throws IOException {
-        // ...
-        //find current peer object based on application parameter
-        final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
-
-        //start a counter server
-        final File storageDir = new File("./" + currentPeer.getId());
-        final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
-        counterServer.start();
-        // ...
-    }
+    //set the port (different for each peer) in RaftProperty object
+    final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    //create the counter state machine which holds the counter value
+    final CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+    //build the Raft server
+    this.server = RaftServer.newBuilder()
+            .setGroup(Constants.RAFT_GROUP)
+            .setProperties(properties)
+            .setServerId(peer.getId())
+            .setStateMachine(counterStateMachine)
+            .build();
+  }
 
+  // ...
 }
 ```
 
-After the server is started,
-it will try to communicate with other peers in the cluster,
-and perform raft actions like leader election, append log entries, etc.
+Now we are ready to start our `CounterServer` peers and form a raft group.
+The command is:
+```shell
+java org.apache.ratis.examples.counter.server.CounterServer peer_index
+```
+The argument `peer_index` must be 0, 1 or 2.
+
+After a server is started,
+it communicates with other peers in the group,
+and performs raft actions such as leader election and append-log-entries.
+After all three servers are started,
+the counter service is up and running with the Raft protocol.
 
-## Build Raft Client
+For more details, see
+[CounterServer.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java).
 
-To send commands to the cluster,
-we need to use a `RaftClient` instance.
-All we need to know is the peers in the cluster, ie. the raft group. 
+## Building & Running the `CounterClient`
+
+We use a `RaftGroup` to build a `RaftClient`
+and then use the `RaftClient` to send commands to the raft service.
+Note that gRPC is the default RPC type
+so that we may skip setting it in the `RaftProperties`.
 
 ```java
-public final class CounterClient {
-    // ...
-    private static RaftClient buildClient() {
-        RaftProperties raftProperties = new RaftProperties();
-        RaftClient.Builder builder = RaftClient.newBuilder()
-                .setProperties(raftProperties)
-                .setRaftGroup(Constants.RAFT_GROUP)
-                .setClientRpc(
-                        new GrpcFactory(new Parameters())
-                                .newRaftClientRpc(ClientId.randomId(), raftProperties));
-        return builder.build();
-    }
+public final class CounterClient implements Closeable {
+  private final RaftClient client = RaftClient.newBuilder()
+          .setProperties(new RaftProperties())
+          .setRaftGroup(Constants.RAFT_GROUP)
+          .build();
+
+  // ...
 }
 ```
 
 With this raft client,
-we can then send commands by `raftClient.io().send` method,
-and use `raftClient.io().sendReadonly` method for read only commands.
-In this example,
-to send `INCREMENT` and `GET` command,
-we can do it like this:
-
+we can then send commands using the `BlockingApi` returned by `RaftClient.io()`,
+or the `AsyncApi` returned by `RaftClient.async()`.
+The `send` method in the `BlockingApi`/`AsyncApi` is used to send the `INCREMENT` command as below.
 ```java
-raftClient.io().send(Message.valueOf("INCREMENT")));
+client.io().send(CounterCommand.INCREMENT.getMessage());
 ```
-
-and
-
+or
 ```java
-RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
-String response = count.getMessage().getContent().toString(Charset.defaultCharset());
-System.out.println(response);
+client.async().send(CounterCommand.INCREMENT.getMessage());
 ```
-
-## Summary
-It might seem a little complicated for beginners,
-but since Raft itself is a hard topic,
-this is already the simplest example we've found as a 'Hello World' for Ratis.
-After you have a basic understanding of Ratis,
-you'll find it really easy to be integrated into any projects. 
-
-Next, you can take a look at other [examples](https://github.com/apache/ratis/tree/master/ratis-examples),
-to know more about the features of Ratis.
\ No newline at end of file
+The `sendReadonly` method in the `BlockingApi`/`AsyncApi` is used to send the `GET` command as below.
+```java
+client.io().sendReadOnly(CounterCommand.GET.getMessage());
+```
+or
+```java
+client.async().sendReadOnly(CounterCommand.GET.getMessage());
+```
+For more details, see
+[CounterClient.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java).
\ No newline at end of file
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index 9da64092d..9fa873fc2 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -71,10 +71,9 @@ public final class Constants {
     PEERS = Collections.unmodifiableList(peers);
   }
 
-  private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+  private static final UUID GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
 
-  public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
-      RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
+  public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(RaftGroupId.valueOf(Constants.GROUP_ID), PEERS);
 
   private Constants() {
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
new file mode 100644
index 000000000..843a158fb
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.protocol.Message;
+
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+  /** Increment the counter by 1. */
+  INCREMENT,
+  /** Get the counter value. */
+  GET;
+
+  private final Message message = Message.valueOf(name());
+
+  public Message getMessage() {
+    return message;
+  }
+
+  /** Does the given command string match this command? */
+  public boolean matches(String command) {
+    return name().equalsIgnoreCase(command);
+  }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index a3f28cb6a..c0350ec72 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,24 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.client;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.examples.common.Constants;
-import org.apache.ratis.grpc.GrpcFactory;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
+import org.apache.ratis.examples.counter.CounterCommand;
 import org.apache.ratis.protocol.RaftClientReply;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 /**
  * Counter client application, this application sends specific number of
@@ -42,55 +40,75 @@ import java.util.concurrent.TimeUnit;
  * Parameter to this application indicate the number of INCREMENT command, if no
  * parameter found, application use default value which is 10
  */
-public final class CounterClient {
+public final class CounterClient implements Closeable {
+  //build the client
+  private final RaftClient client = RaftClient.newBuilder()
+      .setProperties(new RaftProperties())
+      .setRaftGroup(Constants.RAFT_GROUP)
+      .build();
 
-  private CounterClient(){
+  @Override
+  public void close() throws IOException {
+    client.close();
   }
 
-  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
-  public static void main(String[] args)
-      throws IOException, InterruptedException {
-    //indicate the number of INCREMENT command, set 10 if no parameter passed
-    int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
-
-    //build the counter cluster client
-    RaftClient raftClient = buildClient();
+  private void run(int increment, boolean blocking) throws Exception {
+    System.out.printf("Sending %d %s command(s) using the %s ...%n",
+        increment, CounterCommand.INCREMENT, blocking? "BlockingApi": "AsyncApi");
+    final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
 
-    //use a executor service with 10 thread to send INCREMENT commands
-    // concurrently
-    ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    //send INCREMENT commands concurrently
-    System.out.printf("Sending %d increment command...%n", increment);
-    for (int i = 0; i < increment; i++) {
-      executorService.submit(() ->
-          raftClient.io().send(Message.valueOf("INCREMENT")));
+    //send INCREMENT command(s)
+    if (blocking) {
+      // use BlockingApi
+      final ExecutorService executor = Executors.newFixedThreadPool(10);
+      for (int i = 0; i < increment; i++) {
+        final Future<RaftClientReply> f = executor.submit(
+            () -> client.io().send(CounterCommand.INCREMENT.getMessage()));
+        futures.add(f);
+      }
+      executor.shutdown();
+    } else {
+      // use AsyncApi
+      for (int i = 0; i < increment; i++) {
+        final Future<RaftClientReply> f = client.async().send(CounterCommand.INCREMENT.getMessage());
+        futures.add(f);
+      }
     }
 
-    //shutdown the executor service and wait until they finish their work
-    executorService.shutdown();
-    executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
+    //wait for the futures
+    for (Future<RaftClientReply> f : futures) {
+      final RaftClientReply reply = f.get();
+      if (reply.isSuccess()) {
+        final String count = reply.getMessage().getContent().toStringUtf8();
+        System.out.println("Counter is incremented to " + count);
+      } else {
+        System.err.println("Failed " + reply);
+      }
+    }
 
-    //send GET command and print the response
-    RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
-    String response = count.getMessage().getContent().toString(Charset.defaultCharset());
-    System.out.println(response);
+    //send a GET command and print the reply
+    final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
+    final String count = reply.getMessage().getContent().toStringUtf8();
+    System.out.println("Current counter value: " + count);
   }
 
-  /**
-   * build the RaftClient instance which is used to communicate to
-   * Counter cluster
-   *
-   * @return the created client of Counter cluster
-   */
-  private static RaftClient buildClient() {
-    RaftProperties raftProperties = new RaftProperties();
-    RaftClient.Builder builder = RaftClient.newBuilder()
-        .setProperties(raftProperties)
-        .setRaftGroup(Constants.RAFT_GROUP)
-        .setClientRpc(
-            new GrpcFactory(new Parameters())
-                .newRaftClientRpc(ClientId.randomId(), raftProperties));
-    return builder.build();
+  public static void main(String[] args) {
+    try(CounterClient client = new CounterClient()) {
+      //the number of INCREMENT commands, default is 10
+      final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+      final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
+      client.run(increment, io);
+    } catch (Throwable e) {
+      e.printStackTrace();
+      System.err.println();
+      System.err.println("args = " + Arrays.toString(args));
+      System.err.println();
+      System.err.println("Usage: java org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
+      System.err.println();
+      System.err.println("       increment: the number of INCREMENT commands to be sent (default is 10)");
+      System.err.println("       async    : use the AsyncApi (default)");
+      System.err.println("       io       : use the BlockingApi");
+      System.exit(1);
+    }
   }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index b6fc8c7d8..7a6367ece 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.server;
 
 import org.apache.ratis.conf.RaftProperties;
@@ -29,6 +28,7 @@ import org.apache.ratis.util.NetUtils;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Scanner;
 
@@ -48,19 +48,19 @@ public final class CounterServer implements Closeable {
 
   public CounterServer(RaftPeer peer, File storageDir) throws IOException {
     //create a property object
-    RaftProperties properties = new RaftProperties();
+    final RaftProperties properties = new RaftProperties();
 
-    //set the storage directory (different for each peer) in RaftProperty object
+    //set the storage directory (different for each peer) in the RaftProperty object
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
 
-    //set the port which server listen to in RaftProperty object
+    //set the port (different for each peer) in RaftProperty object
     final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
 
-    //create the counter state machine which hold the counter value
-    CounterStateMachine counterStateMachine = new CounterStateMachine();
+    //create the counter state machine which holds the counter value
+    final CounterStateMachine counterStateMachine = new CounterStateMachine();
 
-    //create and start the Raft server
+    //build the Raft server
     this.server = RaftServer.newBuilder()
         .setGroup(Constants.RAFT_GROUP)
         .setProperties(properties)
@@ -78,24 +78,41 @@ public final class CounterServer implements Closeable {
     server.close();
   }
 
-  public static void main(String[] args) throws IOException {
-    if (args.length < 1) {
-      System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
-      System.err.println("{serverIndex} could be 1, 2 or 3");
+  public static void main(String[] args) {
+    try {
+      //get peerIndex from the arguments
+      if (args.length != 1) {
+        throw new IllegalArgumentException("Invalid argument number: expected to be 1 but actual is " + args.length);
+      }
+      final int peerIndex = Integer.parseInt(args[0]);
+      if (peerIndex < 0 || peerIndex > 2) {
+        throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex);
+      }
+
+      startServer(peerIndex);
+    } catch(Throwable e) {
+      e.printStackTrace();
+      System.err.println();
+      System.err.println("args = " + Arrays.toString(args));
+      System.err.println();
+      System.err.println("Usage: java org.apache.ratis.examples.counter.server.CounterServer peer_index");
+      System.err.println();
+      System.err.println("       peer_index must be 0, 1 or 2");
       System.exit(1);
     }
+  }
 
-    //find current peer object based on application parameter
-    final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
+  private static void startServer(int peerIndex) throws IOException {
+    //get peer and define storage dir
+    final RaftPeer currentPeer = Constants.PEERS.get(peerIndex);
+    final File storageDir = new File("./" + currentPeer.getId());
 
     //start a counter server
-    final File storageDir = new File("./" + currentPeer.getId());
-    final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
-    counterServer.start();
+    try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) {
+      counterServer.start();
 
-    //exit when any input entered
-    Scanner scanner = new Scanner(System.in, UTF_8.name());
-    scanner.nextLine();
-    counterServer.close();
+      //exit when any input entered
+      new Scanner(System.in, UTF_8.name()).nextLine();
+    }
   }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 7159ec1c5..d5a027910 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.server;
 
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -34,52 +35,81 @@ import org.apache.ratis.util.JavaUtils;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * State machine implementation for Counter server application. This class
- * maintain a {@link AtomicInteger} object as a state and accept two commands:
- * GET and INCREMENT, GET is a ReadOnly command which will be handled by
- * {@code query} method however INCREMENT is a transactional command which
- * will be handled by {@code applyTransaction}.
+ * A {@link org.apache.ratis.statemachine.StateMachine} implementation for the {@link CounterServer}.
+ * This class maintain a {@link AtomicInteger} object as a state and accept two commands:
+ *
+ * - {@link CounterCommand#GET} is a readonly command
+ *   which is handled by the {@link #query(Message)} method.
+ *
+ * - {@link CounterCommand#INCREMENT} is a transactional command
+ *   which is handled by the {@link #applyTransaction(TransactionContext)} method.
  */
 public class CounterStateMachine extends BaseStateMachine {
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage();
-  private AtomicInteger counter = new AtomicInteger(0);
+  /** The state of the {@link CounterStateMachine}. */
+  static class CounterState {
+    private final TermIndex applied;
+    private final int counter;
+
+    CounterState(TermIndex applied, int counter) {
+      this.applied = applied;
+      this.counter = counter;
+    }
+
+    TermIndex getApplied() {
+      return applied;
+    }
+
+    int getCounter() {
+      return counter;
+    }
+  }
+
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final AtomicInteger counter = new AtomicInteger(0);
+
+  /** @return the current state. */
+  private synchronized CounterState getState() {
+    return new CounterState(getLastAppliedTermIndex(), counter.get());
+  }
+
+  private synchronized void updateState(TermIndex applied, int counterValue) {
+    updateLastAppliedTermIndex(applied);
+    counter.set(counterValue);
+  }
+
+  private synchronized int incrementCounter(TermIndex termIndex) {
+    updateLastAppliedTermIndex(termIndex);
+    return counter.incrementAndGet();
+  }
 
   /**
-   * initialize the state machine by initilize the state machine storage and
-   * calling the load method which reads the last applied command and restore it
-   * in counter object)
+   * Initialize the state machine storage and then load the state.
    *
-   * @param server      the current server information
-   * @param groupId     the cluster groupId
-   * @param raftStorage the raft storage which is used to keep raft related
-   *                    stuff
-   * @throws IOException if any error happens during load state
+   * @param server  the server running this state machine
+   * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+   * @param raftStorage the storage of the server
+   * @throws IOException if it fails to load the state.
    */
   @Override
-  public void initialize(RaftServer server, RaftGroupId groupId,
-                         RaftStorage raftStorage) throws IOException {
+  public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
     super.initialize(server, groupId, raftStorage);
-    this.storage.init(raftStorage);
-    load(storage.getLatestSnapshot());
+    storage.init(raftStorage);
+    reinitialize();
   }
 
   /**
-   * very similar to initialize method, but doesn't initialize the storage
-   * system because the state machine reinitialized from the PAUSE state and
-   * storage system initialized before.
+   * Simply load the latest snapshot.
    *
-   * @throws IOException if any error happens during load state
+   * @throws IOException if it fails to load the state.
    */
   @Override
   public void reinitialize() throws IOException {
@@ -87,124 +117,107 @@ public class CounterStateMachine extends BaseStateMachine {
   }
 
   /**
-   * Store the current state as an snapshot file in the stateMachineStorage.
+   * Store the current state as a snapshot file in the {@link #storage}.
    *
    * @return the index of the snapshot
    */
   @Override
   public long takeSnapshot() {
-    //get the last applied index
-    final TermIndex last = getLastAppliedTermIndex();
+    //get the current state
+    final CounterState state = getState();
+    final long index = state.getApplied().getIndex();
 
     //create a file with a proper name to store the snapshot
-    final File snapshotFile =
-        storage.getSnapshotFile(last.getTerm(), last.getIndex());
+    final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
 
-    //serialize the counter object and write it into the snapshot file
-    try (ObjectOutputStream out = new ObjectOutputStream(
-        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-      out.writeObject(counter);
+    //write the counter value into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+        Files.newOutputStream(snapshotFile.toPath())))) {
+      out.writeInt(state.getCounter());
     } catch (IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
-          + "\", last applied index=" + last);
+          + "\", last applied index=" + state.getApplied());
     }
 
     //return the index of the stored snapshot (which is the last applied one)
-    return last.getIndex();
+    return index;
   }
 
   /**
-   * Load the state of the state machine from the storage.
+   * Load the state of the state machine from the {@link #storage}.
    *
-   * @param snapshot to load
+   * @param snapshot the information of the snapshot being loaded
    * @return the index of the snapshot or -1 if snapshot is invalid
-   * @throws IOException if any error happens during read from storage
+   * @throws IOException if it failed to read from storage
    */
   private long load(SingleFileSnapshotInfo snapshot) throws IOException {
-    //check the snapshot nullity
+    //check null
     if (snapshot == null) {
       LOG.warn("The snapshot info is null.");
       return RaftLog.INVALID_LOG_INDEX;
     }
-
-    //check the existance of the snapshot file
-    final File snapshotFile = snapshot.getFile().getPath().toFile();
-    if (!snapshotFile.exists()) {
-      LOG.warn("The snapshot file {} does not exist for snapshot {}",
-          snapshotFile, snapshot);
+    //check if the snapshot file exists.
+    final Path snapshotPath = snapshot.getFile().getPath();
+    if (!Files.exists(snapshotPath)) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
       return RaftLog.INVALID_LOG_INDEX;
     }
 
-    //load the TermIndex object for the snapshot using the file name pattern of
-    // the snapshot
-    final TermIndex last =
-        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-
-    //read the file and cast it to the AtomicInteger and set the counter
-    try (ObjectInputStream in = new ObjectInputStream(
-        new BufferedInputStream(new FileInputStream(snapshotFile)))) {
-      //set the last applied termIndex to the termIndex of the snapshot
-      setLastAppliedTermIndex(last);
-
-      //read, cast and set the counter
-      counter = JavaUtils.cast(in.readObject());
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
+    //read the TermIndex from the snapshot file name
+    final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+    //read the counter value from the snapshot file
+    final int counterValue;
+    try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+      counterValue = in.readInt();
     }
 
+    //update state
+    updateState(last, counterValue);
+
     return last.getIndex();
   }
 
   /**
-   * Handle GET command, which used by clients to get the counter value.
+   * Process {@link CounterCommand#GET}, which gets the counter value.
    *
-   * @param request the GET message
-   * @return the Message containing the current counter value
+   * @param request the GET request
+   * @return a {@link Message} containing the current counter value as a {@link String}.
    */
   @Override
   public CompletableFuture<Message> query(Message request) {
-    String msg = request.getContent().toString(Charset.defaultCharset());
-    if (!msg.equals("GET")) {
-      return CompletableFuture.completedFuture(
-          Message.valueOf("Invalid Command"));
+    final String command = request.getContent().toStringUtf8();
+    if (!CounterCommand.GET.matches(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
-    return CompletableFuture.completedFuture(
-        Message.valueOf(counter.toString()));
+    return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
   }
 
   /**
-   * Apply the INCREMENT command by incrementing the counter object.
+   * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
    *
    * @param trx the transaction context
    * @return the message containing the updated counter value
    */
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+    final LogEntryProto entry = trx.getLogEntry();
 
     //check if the command is valid
-    String logData = entry.getStateMachineLogEntry().getLogData()
-        .toString(Charset.defaultCharset());
-    if (!logData.equals("INCREMENT")) {
-      return CompletableFuture.completedFuture(
-          Message.valueOf("Invalid Command"));
+    final String command = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
+    if (!CounterCommand.INCREMENT.matches(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
-    //update the last applied term and index
-    final long index = entry.getIndex();
-    updateLastAppliedTermIndex(entry.getTerm(), index);
-
-    //actual execution of the command: increment the counter
-    counter.incrementAndGet();
+    //increment the counter and update term-index
+    final TermIndex termIndex = TermIndex.valueOf(entry);
+    final long incremented = incrementCounter(termIndex);
 
-    //return the new value of the counter to the client
-    final CompletableFuture<Message> f =
-        CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-
-    //if leader, log the incremented value and it's log index
-    if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
-      LOG.info("{}: Increment to {}", index, counter.toString());
+    //if leader, log the incremented value and the term-index
+    if (trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", termIndex, incremented);
     }
 
-    return f;
+    //return the new value of the counter to the client
+    return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
   }
 }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index e63789117..953fd5bfa 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.Collection;
 
 public class TestCounter extends ParameterizedBaseTest {
@@ -50,20 +49,17 @@ public class TestCounter extends ParameterizedBaseTest {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply1 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("10",
-          reply1.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("10", reply1.getMessage().getContent().toStringUtf8());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply2 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("20",
-          reply2.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("20", reply2.getMessage().getContent().toStringUtf8());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply3 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("30",
-          reply3.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("30", reply3.getMessage().getContent().toStringUtf8());
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 9cc4c5b21..629a55a67 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -132,9 +132,12 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
     updateLastAppliedTermIndex(term, index);
   }
 
-  @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
   protected boolean updateLastAppliedTermIndex(long term, long index) {
-    final TermIndex newTI = TermIndex.valueOf(term, index);
+    return updateLastAppliedTermIndex(TermIndex.valueOf(term, index));
+  }
+
+  protected boolean updateLastAppliedTermIndex(TermIndex newTI) {
+    Objects.requireNonNull(newTI, "newTI == null");
     final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
     if (!newTI.equals(oldTI)) {
       LOG.trace("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI);
@@ -147,7 +150,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
     }
 
     synchronized (transactionFutures) {
-      for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) {
+      for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= newTI.getIndex(); ) {
         transactionFutures.remove(i).complete(null);
       }
     }