You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2022/08/07 15:12:56 UTC

[GitHub] [ratis] codings-dan commented on a diff in pull request #698: RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc.

codings-dan commented on code in PR #698:
URL: https://github.com/apache/ratis/pull/698#discussion_r939680560


##########
ratis-docs/src/site/markdown/start/index.md:
##########
@@ -61,294 +90,436 @@ For example, let's use grpc transport:
 Please note that Apache Hadoop dependencies are shaded,
 so it’s safe to use hadoop transport with different versions of Hadoop.
 
-## Create the StateMachine
-A state machine is used to maintain the current state of the raft node,
-the state machine is responsible for:
-
-* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed, 
-the counter value will be increased by 1.
-And a `GET` log does not affect the state but only returns the current counter value to the client.
-* Managing snapshots loading/saving.
-Snapshots are used to speed the log execution,
-the state machine could start from a snapshot point and only execute newer logs.
-
-### Define the StateMachine
-To define our state machine,
-we can extend a class from the base class `BaseStateMachine`.
-
-Also, a storage is needed to store snapshots,
-and we'll use the build-in `SimpleStateMachineStorage`,
-which is a file-based storage implementation.
-
-Since we're going to implement a count server,
-the `counter` instance is defined in the state machine,
-represents the current value.
-Below is the declaration of the state machine: 
+## Implementing the `CounterStateMachine`
+A state machine manages the application logic.
+The state machine is responsible for:
+
+* Apply raft log transactions in order to maintain the current state.
+  * When there is an `INCREMENT` request,
+    it will first be written to the raft log as a log entry.
+    Once the log entry is committed,
+    the state machine will be invoked for applying the log entry as a transaction
+    so that the counter value will be increased by 1.
+  * When there is a `GET` request,
+    it will not be written to the raft log
+    since it is a readonly request which does not change the state.
+    The state machine should return the current value of the counter.
+* Manage snapshots loading/saving.
+  * Snapshots are used for log compaction
+    so that the state machine can be restored from a snapshot
+    and then applies only the newer log entries,
+    instead of applying a long history of log starting from the beginning.
+
+We discuss how to implement `CounterStateMachine` in the following subsections.
+The complete source code of it is in
+[CounterStateMachine.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java).
+
+### Defining the State
+In this example,
+the `CounterStateMachine` extends the `BaseStateMachine`,
+which provides a base implementation of a `StateMachine`.
+
+Inside the `CounterStateMachine`,
+there is a `counter` object
+which stores the current value.
+The `counter` is an `AtomicInteger`
+in order to support concurrent access.
+We use the build-in `SimpleStateMachineStorage`,
+which is a file-based storage implementation,
+as a storage for storing snapshots.
+The fields are shown below:
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    private final SimpleStateMachineStorage storage =
-            new SimpleStateMachineStorage();
-    private AtomicInteger counter = new AtomicInteger(0);
-    // ...
+  // ...
+
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final AtomicInteger counter = new AtomicInteger(0);
+
+  // ...
 }
 ```
 
-### Apply Raft Log Item
+### Applying Raft Log Entries
 
-Once the raft log is committed,
-Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method,
-and we need to override this method to decode the message and apply it. 
-
-First, get the log content and decode it:
+Once a raft log entry has been committed
+(i.e. a majority of the servers have acknowledged),
+Ratis notifies the state machine by invoking the `applyTransaction` method.
+The `applyTransaction` method first validates the log entry.
+Then, it applies the log entry by increasing the counter value and updates the term-index.
+The code fragments are shown below.
+Note that the `incrementCounter` method is synchronized
+in order to update both counter and last applied term-index atomically.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        final RaftProtos.LogEntryProto entry = trx.getLogEntry();
-        String logData = entry.getStateMachineLogEntry().getLogData()
-                .toString(Charset.defaultCharset());
-        if (!logData.equals("INCREMENT")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        // ...
+  // ...
+
+  private synchronized int incrementCounter(TermIndex termIndex) {
+    updateLastAppliedTermIndex(termIndex);
+    return counter.incrementAndGet();
+  }
+
+  // ...
+
+  /**
+   * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
+   *
+   * @param trx the transaction context
+   * @return the message containing the updated counter value
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final LogEntryProto entry = trx.getLogEntry();
+
+    //check if the command is valid
+    final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset());
+    if (!CounterCommand.INCREMENT.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    //increment the counter and update term-index
+    final TermIndex termIndex = TermIndex.valueOf(entry);
+    final long incremented = incrementCounter(termIndex);
+
+    //if leader, log the incremented value and the term-index
+    if (trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", termIndex, incremented);
+    }
+
+    //return the new value of the counter to the client
+    return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+  }
+
+  // ...
 }
 ```
 
-After that, if the log is valid,
-we could apply it by increasing the counter value.
-Remember that we also need to update the committed indexes:
+### Processing Readonly Commands
+The `INCREMENT` command is implemented in the previous section.
+What about the `GET` command?
+Since the `GET` command is a readonly command,
+it is implemented by the `query` method instead of the `applyTransaction` method.
+The code fragment is shown below.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        // ...
-        final long index = entry.getIndex();
-        updateLastAppliedTermIndex(entry.getTerm(), index);
-
-        //actual execution of the command: increment the counter
-        counter.incrementAndGet();
-
-        //return the new value of the counter to the client
-        final CompletableFuture<Message> f =
-                CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-        return f;
+  // ...
+
+  /**
+   * Process {@link CounterCommand#GET}, which gets the counter value.
+   *
+   * @param request the GET request
+   * @return a {@link Message} containing the current counter value as a {@link String}.
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    final String command = request.getContent().toString(Charset.defaultCharset());
+    if (!CounterCommand.GET.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+  }
+
+  // ...
 }
 ```
 
-### Handle Readonly Command
-Note that we only handled `INCREMENT` command,
-what about the `GET` command?
-The `GET` command is implemented as a readonly command,
-so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`.
+### Taking Snapshots
+When taking a snapshot,
+the state is persisted in the storage of the state machine.
+The snapshot can be loaded for restoring the state in the future.
+In this example,
+we use `ObjectOutputStream` to write the counter value to a snapshot file.
+The term-index is stored in the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `getState` method is synchronized
+in order to get the applied term-index and the counter value atomically.
+Note also that getting the counter value alone does not have to be synchronized
+since the `counter` field is already an `AtomicInteger`.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    @Override
-    public CompletableFuture<Message> query(Message request) {
-        String msg = request.getContent().toString(Charset.defaultCharset());
-        if (!msg.equals("GET")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        return CompletableFuture.completedFuture(
-                Message.valueOf(counter.toString()));
+  // ...
+
+  /** The state of the {@link CounterStateMachine}. */
+  static class CounterState {
+    private final TermIndex applied;
+    private final int counter;
+
+    CounterState(TermIndex applied, int counter) {
+      this.applied = applied;
+      this.counter = counter;
+    }
+
+    TermIndex getApplied() {
+      return applied;
+    }
+
+    int getCounter() {
+      return counter;
+    }
+  }
+
+  // ...
+
+  /** @return the current state. */
+  private synchronized CounterState getState() {
+    return new CounterState(getLastAppliedTermIndex(), counter.get());
+  }
+
+  // ...
+
+  /**
+   * Store the current state as a snapshot file in the {@link #storage}.
+   *
+   * @return the index of the snapshot
+   */
+  @Override
+  public long takeSnapshot() {
+    //get the current state
+    final CounterState state = getState();
+    final long index = state.getApplied().getIndex();
+
+    //create a file with a proper name to store the snapshot
+    final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);

Review Comment:
   `getSnapshotFile` is easily misunderstood. Since it is creating a snapshot file, I suggest changing to `createSnapshotFile`.



##########
ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java:
##########
@@ -78,24 +78,41 @@ public void close() throws IOException {
     server.close();
   }
 
-  public static void main(String[] args) throws IOException {
-    if (args.length < 1) {
-      System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
-      System.err.println("{serverIndex} could be 1, 2 or 3");
+  public static void main(String[] args) {
+    try {
+      //get peerIndex from the arguments
+      if (args.length == 1) {

Review Comment:
   This judgment condition seems to be wrong,args.length != 1 



##########
ratis-docs/src/site/markdown/start/index.md:
##########
@@ -43,13 +71,14 @@ it's available in maven central:
 </dependency>
 ```
 
-Also, one of the following transports need to be added:
+Then, add one of the following transports:
 
-* grpc
-* netty
-* hadoop
+* ratis-grpc
+* ratis-netty
+* ratis-hadoop

Review Comment:
   Ratis no longer supports `ratis-hadoop`, we need to remove the line or indicate the information about the supported version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org