You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/09 01:57:52 UTC
[ratis] branch master updated: RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1009a6c38 RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
1009a6c38 is described below
commit 1009a6c385373b3280cc9066e02ce050911627f8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Aug 8 18:57:47 2022 -0700
RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
* RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc.
* Revert ArithmeticStateMachine.java
* Add an option to use AsyncApi
* Fix a long line
* Minor changes in CounterClient.
* Remove the use of Charset.
* Revert some CounterServer change for a findbugs warning.
* Fix a bug in CounterServer.main.
---
ratis-docs/src/site/markdown/start/index.md | 653 +++++++++++++--------
.../apache/ratis/examples/common/Constants.java | 5 +-
.../ratis/examples/counter/CounterCommand.java | 41 ++
.../examples/counter/client/CounterClient.java | 118 ++--
.../examples/counter/server/CounterServer.java | 59 +-
.../counter/server/CounterStateMachine.java | 205 ++++---
.../apache/ratis/examples/counter/TestCounter.java | 10 +-
.../ratis/statemachine/impl/BaseStateMachine.java | 9 +-
8 files changed, 679 insertions(+), 421 deletions(-)
diff --git a/ratis-docs/src/site/markdown/start/index.md b/ratis-docs/src/site/markdown/start/index.md
index 7aca21f9b..beaa519ea 100644
--- a/ratis-docs/src/site/markdown/start/index.md
+++ b/ratis-docs/src/site/markdown/start/index.md
@@ -18,23 +18,51 @@
# Getting Started
Let's get started to use Raft in your application.
To demonstrate how to use Ratis,
-we'll implement a simple counter service,
-which maintains a counter value across a cluster.
-The client could send the following types of commands to the cluster:
+we implement a simple *Counter* service,
+which maintains a counter value across a raft group.
+Clients could send the following types of requests to the raft group:
-* `INCREMENT`: increase the counter value
-* `GET`: query the current value of the counter,
-we call such kind of commands as read-only commands
+* `INCREMENT`: increase the counter value by 1.
+This command will trigger a transaction to change the state.
+* `GET`: query the current value of the counter.
+This is a read-only command since it does not change the state.
-Note: The full source could be found at [Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
-This article is mainly intended to show the steps of integration of Ratis,
-if you wish to run this example or find more examples,
+We have the following `enum` for representing the supported commands.
+
+```java
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+ /** Increment the counter by 1. */
+ INCREMENT,
+ /** Get the counter value. */
+ GET;
+
+ private final Message message = Message.valueOf(name());
+
+ public Message getMessage() {
+ return message;
+ }
+
+ /** Does the given command string match this command? */
+ public boolean matches(String command) {
+ return name().equalsIgnoreCase(command);
+ }
+}
+```
+
+Note:
+The source code of the Counter example and the other examples is at
+[Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
+This article intends to show the steps of integration of Ratis.
+If you wish to run the Counter example
please refer to [the README](https://github.com/apache/ratis/tree/master/ratis-examples#example-3-counter).
-## Add the dependency
+## Adding the Dependency
-First, we need to add Ratis dependencies into the project,
-it's available in maven central:
+The first step is to add Ratis dependencies into the project.
+The dependencies are available in maven central:
```xml
<dependency>
@@ -43,13 +71,14 @@ it's available in maven central:
</dependency>
```
-Also, one of the following transports need to be added:
+Then, add one of the following transports:
-* grpc
-* netty
-* hadoop
+* ratis-grpc
+* ratis-netty
+* ratis-hadoop
-For example, let's use grpc transport:
+In this example,
+we choose to use ratis-grpc:
```xml
<dependency>
@@ -61,294 +90,436 @@ For example, let's use grpc transport:
Please note that Apache Hadoop dependencies are shaded,
so it’s safe to use hadoop transport with different versions of Hadoop.
-## Create the StateMachine
-A state machine is used to maintain the current state of the raft node,
-the state machine is responsible for:
-
-* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed,
-the counter value will be increased by 1.
-And a `GET` log does not affect the state but only returns the current counter value to the client.
-* Managing snapshots loading/saving.
-Snapshots are used to speed the log execution,
-the state machine could start from a snapshot point and only execute newer logs.
-
-### Define the StateMachine
-To define our state machine,
-we can extend a class from the base class `BaseStateMachine`.
-
-Also, a storage is needed to store snapshots,
-and we'll use the build-in `SimpleStateMachineStorage`,
-which is a file-based storage implementation.
-
-Since we're going to implement a count server,
-the `counter` instance is defined in the state machine,
-represents the current value.
-Below is the declaration of the state machine:
+## Implementing the `CounterStateMachine`
+A state machine manages the application logic.
+The state machine is responsible for:
+
+* Apply raft log transactions in order to maintain the current state.
+ * When there is an `INCREMENT` request,
+ it will first be written to the raft log as a log entry.
+ Once the log entry is committed,
+ the state machine will be invoked for applying the log entry as a transaction
+ so that the counter value will be increased by 1.
+ * When there is a `GET` request,
+ it will not be written to the raft log
+ since it is a readonly request which does not change the state.
+ The state machine should return the current value of the counter.
+* Manage snapshots loading/saving.
+ * Snapshots are used for log compaction
+ so that the state machine can be restored from a snapshot
+ and then applies only the newer log entries,
+ instead of applying a long history of log starting from the beginning.
+
+We discuss how to implement `CounterStateMachine` in the following subsections.
+The complete source code of it is in
+[CounterStateMachine.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java).
+
+### Defining the State
+In this example,
+the `CounterStateMachine` extends the `BaseStateMachine`,
+which provides a base implementation of a `StateMachine`.
+
+Inside the `CounterStateMachine`,
+there is a `counter` object
+which stores the current value.
+The `counter` is an `AtomicInteger`
+in order to support concurrent access.
+We use the build-in `SimpleStateMachineStorage`,
+which is a file-based storage implementation,
+as a storage for storing snapshots.
+The fields are shown below:
```java
public class CounterStateMachine extends BaseStateMachine {
- private final SimpleStateMachineStorage storage =
- new SimpleStateMachineStorage();
- private AtomicInteger counter = new AtomicInteger(0);
- // ...
+ // ...
+
+ private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ // ...
}
```
-### Apply Raft Log Item
+### Applying Raft Log Entries
-Once the raft log is committed,
-Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method,
-and we need to override this method to decode the message and apply it.
-
-First, get the log content and decode it:
+Once a raft log entry has been committed
+(i.e. a majority of the servers have acknowledged),
+Ratis notifies the state machine by invoking the `applyTransaction` method.
+The `applyTransaction` method first validates the log entry.
+Then, it applies the log entry by increasing the counter value and updates the term-index.
+The code fragments are shown below.
+Note that the `incrementCounter` method is synchronized
+in order to update both counter and last applied term-index atomically.
```java
public class CounterStateMachine extends BaseStateMachine {
- // ...
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- final RaftProtos.LogEntryProto entry = trx.getLogEntry();
- String logData = entry.getStateMachineLogEntry().getLogData()
- .toString(Charset.defaultCharset());
- if (!logData.equals("INCREMENT")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
- }
- // ...
+ // ...
+
+ private synchronized int incrementCounter(TermIndex termIndex) {
+ updateLastAppliedTermIndex(termIndex);
+ return counter.incrementAndGet();
+ }
+
+ // ...
+
+ /**
+ * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
+ *
+ * @param trx the transaction context
+ * @return the message containing the updated counter value
+ */
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ final LogEntryProto entry = trx.getLogEntry();
+
+ //check if the command is valid
+ final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset());
+ if (!CounterCommand.INCREMENT.match(command)) {
+ return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
}
+ //increment the counter and update term-index
+ final TermIndex termIndex = TermIndex.valueOf(entry);
+ final long incremented = incrementCounter(termIndex);
+
+ //if leader, log the incremented value and the term-index
+ if (trx.getServerRole() == RaftPeerRole.LEADER) {
+ LOG.info("{}: Increment to {}", termIndex, incremented);
+ }
+
+ //return the new value of the counter to the client
+ return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+ }
+
+ // ...
}
```
-After that, if the log is valid,
-we could apply it by increasing the counter value.
-Remember that we also need to update the committed indexes:
+### Processing Readonly Commands
+The `INCREMENT` command is implemented in the previous section.
+What about the `GET` command?
+Since the `GET` command is a readonly command,
+it is implemented by the `query` method instead of the `applyTransaction` method.
+The code fragment is shown below.
```java
public class CounterStateMachine extends BaseStateMachine {
- // ...
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- // ...
- final long index = entry.getIndex();
- updateLastAppliedTermIndex(entry.getTerm(), index);
-
- //actual execution of the command: increment the counter
- counter.incrementAndGet();
-
- //return the new value of the counter to the client
- final CompletableFuture<Message> f =
- CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
- return f;
+ // ...
+
+ /**
+ * Process {@link CounterCommand#GET}, which gets the counter value.
+ *
+ * @param request the GET request
+ * @return a {@link Message} containing the current counter value as a {@link String}.
+ */
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ final String command = request.getContent().toString(Charset.defaultCharset());
+ if (!CounterCommand.GET.match(command)) {
+ return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
}
+ return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+ }
+
+ // ...
}
```
-### Handle Readonly Command
-Note that we only handled `INCREMENT` command,
-what about the `GET` command?
-The `GET` command is implemented as a readonly command,
-so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`.
+### Taking Snapshots
+When taking a snapshot,
+the state is persisted in the storage of the state machine.
+The snapshot can be loaded for restoring the state in the future.
+In this example,
+we use `ObjectOutputStream` to write the counter value to a snapshot file.
+The term-index is stored in the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `getState` method is synchronized
+in order to get the applied term-index and the counter value atomically.
+Note also that getting the counter value alone does not have to be synchronized
+since the `counter` field is already an `AtomicInteger`.
```java
public class CounterStateMachine extends BaseStateMachine {
- // ...
- @Override
- public CompletableFuture<Message> query(Message request) {
- String msg = request.getContent().toString(Charset.defaultCharset());
- if (!msg.equals("GET")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
- }
- return CompletableFuture.completedFuture(
- Message.valueOf(counter.toString()));
+ // ...
+
+ /** The state of the {@link CounterStateMachine}. */
+ static class CounterState {
+ private final TermIndex applied;
+ private final int counter;
+
+ CounterState(TermIndex applied, int counter) {
+ this.applied = applied;
+ this.counter = counter;
+ }
+
+ TermIndex getApplied() {
+ return applied;
+ }
+
+ int getCounter() {
+ return counter;
+ }
+ }
+
+ // ...
+
+ /** @return the current state. */
+ private synchronized CounterState getState() {
+ return new CounterState(getLastAppliedTermIndex(), counter.get());
+ }
+
+ // ...
+
+ /**
+ * Store the current state as a snapshot file in the {@link #storage}.
+ *
+ * @return the index of the snapshot
+ */
+ @Override
+ public long takeSnapshot() {
+ //get the current state
+ final CounterState state = getState();
+ final long index = state.getApplied().getIndex();
+
+ //create a file with a proper name to store the snapshot
+ final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
+
+ //write the counter value into the snapshot file
+ try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+ Files.newOutputStream(snapshotFile.toPath())))) {
+ out.writeInt(state.getCounter());
+ } catch (IOException ioe) {
+ LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ + "\", last applied index=" + state.getApplied());
}
+
+ //return the index of the stored snapshot (which is the last applied one)
+ return index;
+ }
+
+ // ...
}
```
-### Save and Load Snapshots
-When taking a snapshot,
-we persist every state in the state machine,
-and the value could be loaded directly to the state in the future.
-In this example,
-the only state is the counter value,
-we're going to use `ObjectOutputStream` to write it to a snapshot file:
+### Loading Snapshots
+When loading a snapshot,
+we use an `ObjectInputStream` to read the snapshot file.
+The term-index is read from the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `updateState` method is synchronized
+in order to update the applied term-index and the counter value atomically.
```java
public class CounterStateMachine extends BaseStateMachine {
- // ...
- @Override
- public long takeSnapshot() {
- //get the last applied index
- final TermIndex last = getLastAppliedTermIndex();
-
- //create a file with a proper name to store the snapshot
- final File snapshotFile =
- storage.getSnapshotFile(last.getTerm(), last.getIndex());
-
- //serialize the counter object and write it into the snapshot file
- try (ObjectOutputStream out = new ObjectOutputStream(
- new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
- out.writeObject(counter);
- } catch (IOException ioe) {
- LOG.warn("Failed to write snapshot file \"" + snapshotFile
- + "\", last applied index=" + last);
- }
-
- //return the index of the stored snapshot (which is the last applied one)
- return last.getIndex();
+ // ...
+
+ private synchronized void updateState(TermIndex applied, int counterValue) {
+ updateLastAppliedTermIndex(applied);
+ counter.set(counterValue);
+ }
+
+ // ...
+
+ /**
+ * Load the state of the state machine from the {@link #storage}.
+ *
+ * @param snapshot the information of the snapshot being loaded
+ * @return the index of the snapshot or -1 if snapshot is invalid
+ * @throws IOException if it failed to read from storage
+ */
+ private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+ //check null
+ if (snapshot == null) {
+ LOG.warn("The snapshot info is null.");
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+ //check if the snapshot file exists.
+ final Path snapshotPath = snapshot.getFile().getPath();
+ if (!Files.exists(snapshotPath)) {
+ LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
+ return RaftLog.INVALID_LOG_INDEX;
}
+
+ //read the TermIndex from the snapshot file name
+ final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+ //read the counter value from the snapshot file
+ final int counterValue;
+ try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+ counterValue = in.readInt();
+ }
+
+ //update state
+ updateState(last, counterValue);
+
+ return last.getIndex();
+ }
+
+ // ...
}
```
-When loading it,
-we could use `ObjectInputStream` to deserialize it.
-Remember that we also need to implement `initialize` and `reinitialize` method,
-so that the state machine will be correctly initialized.
-
-## Build and Start a RaftServer
-In order to build a raft cluster,
-each node must start a `RaftServer` instance,
-which is responsible for communicating to each other through Raft protocol.
+### Implementing the `initialize` and `reinitialize` methods.
+The `initialize` method is called at most once
+when the server is starting up.
+In contrast,
+the `reinitialize` method is called when
+1. the server is resumed from the `PAUSE` state, or
+2. a new snapshot is installed from the leader or from an external source.
+
+In `CounterStateMachine`,
+the `reinitialize` method simply loads the latest snapshot
+and the `initialize` method additionally initializes the `BaseStateMachine` super class and the storage.
+```java
+public class CounterStateMachine extends BaseStateMachine {
+ // ...
+
+ /**
+ * Initialize the state machine storage and then load the state.
+ *
+ * @param server the server running this state machine
+ * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+ * @param raftStorage the storage of the server
+ * @throws IOException if it fails to load the state.
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
+ storage.init(raftStorage);
+ reinitialize();
+ }
+
+ /**
+ * Simply load the state.
+ *
+ * @throws IOException if it fails to load the state.
+ */
+ @Override
+ public void reinitialize() throws IOException {
+ load(storage.getLatestSnapshot());
+ }
+
+ // ...
+}
+```
+## Preparing a `RaftGroup`
+In order to run a raft group,
+each server must start a `RaftServer` instance,
+which is responsible for communicating to each other through the Raft protocol.
It's important to keep in mind that,
-each raft server knows exactly how many raft peers are in the cluster,
-and what are the addresses of them.
-In this example, we'll set a 3 node cluster.
+each raft server knows the initial raft group when starting up.
+They know the number of raft peers in the group
+and the addresses of the peers.
+
+In this example, we have a raft group with 3 peers.
For simplicity,
-each peer listens to specific port on the same machine,
-and we can define the addresses of the cluster in a configuration file:
+each peer listens to a specific port on the same machine.
+The addresses of them are defined in a
+[property file](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/resources/conf.properties)
+as below.
```properties
raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
```
-We name those peers as 'n-0', 'n-1' and 'n-2',
-and then we will create a `RaftGroup` instance representing them.
-Since they are immutable,
-we'll put them in the `Constant` class:
+The peers are named as 'n0', 'n1' and 'n2'
+and they form a `RaftGroup`.
+For more details, see
+[Constants.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java).
-```java
-public final class Constants {
- public static final List<RaftPeer> PEERS;
- private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
-
- static {
- // load addresses from configuration file
- // final String[] addresses = ...
- List<RaftPeer> peers = new ArrayList<>(addresses.length);
- for (int i = 0; i < addresses.length; i++) {
- peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
- }
- PEERS = Collections.unmodifiableList(peers);
- }
-
- public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
- RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
- // ...
-}
-```
+## Building & Starting the `CounterServer`
-Except for the cluster info,
-another important thing is that we need to know the information of the current peer.
-To achieve this,
-we could pass the current peer's id as a program argument,
-and then the raft server could be created:
+We use a `RaftServer.Builder` to build a `RaftServer`.
+We first set up a `RaftProperties` object
+with a local directory as the storage of the server
+and a port number as the gRPC server port.
+Then,
+we create our `CounterStateMachine`
+and pass everything to the builder as below.
```java
public final class CounterServer implements Closeable {
- private final RaftServer server;
-
- // the current peer will be passed as argument
- public CounterServer(RaftPeer peer, File storageDir) throws IOException {
- // ...
- CounterStateMachine counterStateMachine = new CounterStateMachine();
-
- //create and start the Raft server
- this.server = RaftServer.newBuilder()
- .setGroup(Constants.RAFT_GROUP)
- .setProperties(properties)
- .setServerId(peer.getId())
- .setStateMachine(counterStateMachine)
- .build();
- }
+ private final RaftServer server;
- public void start() throws IOException {
- server.start();
- }
-}
-```
+ public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+ //create a property object
+ final RaftProperties properties = new RaftProperties();
-Each `RaftServer` will own a `CounterStateMachine` instance,
-as previously defined by us.
-After that, all we need to do is to start it along with our application:
+ //set the storage directory (different for each peer) in the RaftProperty object
+ RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
-```java
-public final class CounterServer implements Closeable {
- // ...
- public static void main(String[] args) throws IOException {
- // ...
- //find current peer object based on application parameter
- final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
-
- //start a counter server
- final File storageDir = new File("./" + currentPeer.getId());
- final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
- counterServer.start();
- // ...
- }
+ //set the port (different for each peer) in RaftProperty object
+ final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
+ GrpcConfigKeys.Server.setPort(properties, port);
+
+ //create the counter state machine which holds the counter value
+ final CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+ //build the Raft server
+ this.server = RaftServer.newBuilder()
+ .setGroup(Constants.RAFT_GROUP)
+ .setProperties(properties)
+ .setServerId(peer.getId())
+ .setStateMachine(counterStateMachine)
+ .build();
+ }
+ // ...
}
```
-After the server is started,
-it will try to communicate with other peers in the cluster,
-and perform raft actions like leader election, append log entries, etc.
+Now we are ready to start our `CounterServer` peers and form a raft group.
+The command is:
+```shell
+java org.apache.ratis.examples.counter.server.CounterServer peer_index
+```
+The argument `peer_index` must be 0, 1 or 2.
+
+After a server is started,
+it communicates with other peers in the group,
+and performs raft actions such as leader election and append-log-entries.
+After all three servers are started,
+the counter service is up and running with the Raft protocol.
-## Build Raft Client
+For more details, see
+[CounterServer.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java).
-To send commands to the cluster,
-we need to use a `RaftClient` instance.
-All we need to know is the peers in the cluster, ie. the raft group.
+## Building & Running the `CounterClient`
+
+We use a `RaftGroup` to build a `RaftClient`
+and then use the `RaftClient` to send commands to the raft service.
+Note that gRPC is the default RPC type
+so that we may skip setting it in the `RaftProperties`.
```java
-public final class CounterClient {
- // ...
- private static RaftClient buildClient() {
- RaftProperties raftProperties = new RaftProperties();
- RaftClient.Builder builder = RaftClient.newBuilder()
- .setProperties(raftProperties)
- .setRaftGroup(Constants.RAFT_GROUP)
- .setClientRpc(
- new GrpcFactory(new Parameters())
- .newRaftClientRpc(ClientId.randomId(), raftProperties));
- return builder.build();
- }
+public final class CounterClient implements Closeable {
+ private final RaftClient client = RaftClient.newBuilder()
+ .setProperties(new RaftProperties())
+ .setRaftGroup(Constants.RAFT_GROUP)
+ .build();
+
+ // ...
}
```
With this raft client,
-we can then send commands by `raftClient.io().send` method,
-and use `raftClient.io().sendReadonly` method for read only commands.
-In this example,
-to send `INCREMENT` and `GET` command,
-we can do it like this:
-
+we can then send commands using the `BlockingApi` returned by `RaftClient.io()`,
+or the `AsyncApi` returned by `RaftClient.async()`.
+The `send` method in the `BlockingApi`/`AsyncApi` is used to send the `INCREMENT` command as below.
```java
-raftClient.io().send(Message.valueOf("INCREMENT")));
+client.io().send(CounterCommand.INCREMENT.getMessage());
```
-
-and
-
+or
```java
-RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
-String response = count.getMessage().getContent().toString(Charset.defaultCharset());
-System.out.println(response);
+client.async().send(CounterCommand.INCREMENT.getMessage());
```
-
-## Summary
-It might seem a little complicated for beginners,
-but since Raft itself is a hard topic,
-this is already the simplest example we've found as a 'Hello World' for Ratis.
-After you have a basic understanding of Ratis,
-you'll find it really easy to be integrated into any projects.
-
-Next, you can take a look at other [examples](https://github.com/apache/ratis/tree/master/ratis-examples),
-to know more about the features of Ratis.
\ No newline at end of file
+The `sendReadonly` method in the `BlockingApi`/`AsyncApi` is used to send the `GET` command as below.
+```java
+client.io().sendReadOnly(CounterCommand.GET.getMessage());
+```
+or
+```java
+client.async().sendReadOnly(CounterCommand.GET.getMessage());
+```
+For more details, see
+[CounterClient.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java).
\ No newline at end of file
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index 9da64092d..9fa873fc2 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -71,10 +71,9 @@ public final class Constants {
PEERS = Collections.unmodifiableList(peers);
}
- private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+ private static final UUID GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
- public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
- RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
+ public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(RaftGroupId.valueOf(Constants.GROUP_ID), PEERS);
private Constants() {
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
new file mode 100644
index 000000000..843a158fb
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.protocol.Message;
+
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+ /** Increment the counter by 1. */
+ INCREMENT,
+ /** Get the counter value. */
+ GET;
+
+ private final Message message = Message.valueOf(name());
+
+ public Message getMessage() {
+ return message;
+ }
+
+ /** Does the given command string match this command? */
+ public boolean matches(String command) {
+ return name().equalsIgnoreCase(command);
+ }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index a3f28cb6a..c0350ec72 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,24 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.examples.counter.client;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.common.Constants;
-import org.apache.ratis.grpc.GrpcFactory;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
+import org.apache.ratis.examples.counter.CounterCommand;
import org.apache.ratis.protocol.RaftClientReply;
+import java.io.Closeable;
import java.io.IOException;
-import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
/**
* Counter client application, this application sends specific number of
@@ -42,55 +40,75 @@ import java.util.concurrent.TimeUnit;
* Parameter to this application indicate the number of INCREMENT command, if no
* parameter found, application use default value which is 10
*/
-public final class CounterClient {
+public final class CounterClient implements Closeable {
+ //build the client
+ private final RaftClient client = RaftClient.newBuilder()
+ .setProperties(new RaftProperties())
+ .setRaftGroup(Constants.RAFT_GROUP)
+ .build();
- private CounterClient(){
+ @Override
+ public void close() throws IOException {
+ client.close();
}
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
- public static void main(String[] args)
- throws IOException, InterruptedException {
- //indicate the number of INCREMENT command, set 10 if no parameter passed
- int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
-
- //build the counter cluster client
- RaftClient raftClient = buildClient();
+ private void run(int increment, boolean blocking) throws Exception {
+ System.out.printf("Sending %d %s command(s) using the %s ...%n",
+ increment, CounterCommand.INCREMENT, blocking? "BlockingApi": "AsyncApi");
+ final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
- //use a executor service with 10 thread to send INCREMENT commands
- // concurrently
- ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- //send INCREMENT commands concurrently
- System.out.printf("Sending %d increment command...%n", increment);
- for (int i = 0; i < increment; i++) {
- executorService.submit(() ->
- raftClient.io().send(Message.valueOf("INCREMENT")));
+ //send INCREMENT command(s)
+ if (blocking) {
+ // use BlockingApi
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < increment; i++) {
+ final Future<RaftClientReply> f = executor.submit(
+ () -> client.io().send(CounterCommand.INCREMENT.getMessage()));
+ futures.add(f);
+ }
+ executor.shutdown();
+ } else {
+ // use AsyncApi
+ for (int i = 0; i < increment; i++) {
+ final Future<RaftClientReply> f = client.async().send(CounterCommand.INCREMENT.getMessage());
+ futures.add(f);
+ }
}
- //shutdown the executor service and wait until they finish their work
- executorService.shutdown();
- executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
+ //wait for the futures
+ for (Future<RaftClientReply> f : futures) {
+ final RaftClientReply reply = f.get();
+ if (reply.isSuccess()) {
+ final String count = reply.getMessage().getContent().toStringUtf8();
+ System.out.println("Counter is incremented to " + count);
+ } else {
+ System.err.println("Failed " + reply);
+ }
+ }
- //send GET command and print the response
- RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
- String response = count.getMessage().getContent().toString(Charset.defaultCharset());
- System.out.println(response);
+ //send a GET command and print the reply
+ final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
+ final String count = reply.getMessage().getContent().toStringUtf8();
+ System.out.println("Current counter value: " + count);
}
- /**
- * build the RaftClient instance which is used to communicate to
- * Counter cluster
- *
- * @return the created client of Counter cluster
- */
- private static RaftClient buildClient() {
- RaftProperties raftProperties = new RaftProperties();
- RaftClient.Builder builder = RaftClient.newBuilder()
- .setProperties(raftProperties)
- .setRaftGroup(Constants.RAFT_GROUP)
- .setClientRpc(
- new GrpcFactory(new Parameters())
- .newRaftClientRpc(ClientId.randomId(), raftProperties));
- return builder.build();
+ public static void main(String[] args) {
+ try(CounterClient client = new CounterClient()) {
+ //the number of INCREMENT commands, default is 10
+ final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+ final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
+ client.run(increment, io);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.err.println();
+ System.err.println("args = " + Arrays.toString(args));
+ System.err.println();
+ System.err.println("Usage: java org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
+ System.err.println();
+ System.err.println(" increment: the number of INCREMENT commands to be sent (default is 10)");
+ System.err.println(" async : use the AsyncApi (default)");
+ System.err.println(" io : use the BlockingApi");
+ System.exit(1);
+ }
}
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index b6fc8c7d8..7a6367ece 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.examples.counter.server;
import org.apache.ratis.conf.RaftProperties;
@@ -29,6 +28,7 @@ import org.apache.ratis.util.NetUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Scanner;
@@ -48,19 +48,19 @@ public final class CounterServer implements Closeable {
public CounterServer(RaftPeer peer, File storageDir) throws IOException {
//create a property object
- RaftProperties properties = new RaftProperties();
+ final RaftProperties properties = new RaftProperties();
- //set the storage directory (different for each peer) in RaftProperty object
+ //set the storage directory (different for each peer) in the RaftProperty object
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
- //set the port which server listen to in RaftProperty object
+ //set the port (different for each peer) in RaftProperty object
final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
- //create the counter state machine which hold the counter value
- CounterStateMachine counterStateMachine = new CounterStateMachine();
+ //create the counter state machine which holds the counter value
+ final CounterStateMachine counterStateMachine = new CounterStateMachine();
- //create and start the Raft server
+ //build the Raft server
this.server = RaftServer.newBuilder()
.setGroup(Constants.RAFT_GROUP)
.setProperties(properties)
@@ -78,24 +78,41 @@ public final class CounterServer implements Closeable {
server.close();
}
- public static void main(String[] args) throws IOException {
- if (args.length < 1) {
- System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
- System.err.println("{serverIndex} could be 1, 2 or 3");
+ public static void main(String[] args) {
+ try {
+ //get peerIndex from the arguments
+ if (args.length != 1) {
+ throw new IllegalArgumentException("Invalid argument number: expected to be 1 but actual is " + args.length);
+ }
+ final int peerIndex = Integer.parseInt(args[0]);
+ if (peerIndex < 0 || peerIndex > 2) {
+ throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex);
+ }
+
+ startServer(peerIndex);
+ } catch(Throwable e) {
+ e.printStackTrace();
+ System.err.println();
+ System.err.println("args = " + Arrays.toString(args));
+ System.err.println();
+ System.err.println("Usage: java org.apache.ratis.examples.counter.server.CounterServer peer_index");
+ System.err.println();
+ System.err.println(" peer_index must be 0, 1 or 2");
System.exit(1);
}
+ }
- //find current peer object based on application parameter
- final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
+ private static void startServer(int peerIndex) throws IOException {
+ //get peer and define storage dir
+ final RaftPeer currentPeer = Constants.PEERS.get(peerIndex);
+ final File storageDir = new File("./" + currentPeer.getId());
//start a counter server
- final File storageDir = new File("./" + currentPeer.getId());
- final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
- counterServer.start();
+ try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) {
+ counterServer.start();
- //exit when any input entered
- Scanner scanner = new Scanner(System.in, UTF_8.name());
- scanner.nextLine();
- counterServer.close();
+ //exit when any input entered
+ new Scanner(System.in, UTF_8.name()).nextLine();
+ }
}
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 7159ec1c5..d5a027910 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.examples.counter.server;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
@@ -34,52 +35,81 @@ import org.apache.ratis.util.JavaUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * State machine implementation for Counter server application. This class
- * maintain a {@link AtomicInteger} object as a state and accept two commands:
- * GET and INCREMENT, GET is a ReadOnly command which will be handled by
- * {@code query} method however INCREMENT is a transactional command which
- * will be handled by {@code applyTransaction}.
+ * A {@link org.apache.ratis.statemachine.StateMachine} implementation for the {@link CounterServer}.
+ * This class maintain a {@link AtomicInteger} object as a state and accept two commands:
+ *
+ * - {@link CounterCommand#GET} is a readonly command
+ * which is handled by the {@link #query(Message)} method.
+ *
+ * - {@link CounterCommand#INCREMENT} is a transactional command
+ * which is handled by the {@link #applyTransaction(TransactionContext)} method.
*/
public class CounterStateMachine extends BaseStateMachine {
- private final SimpleStateMachineStorage storage =
- new SimpleStateMachineStorage();
- private AtomicInteger counter = new AtomicInteger(0);
+ /** The state of the {@link CounterStateMachine}. */
+ static class CounterState {
+ private final TermIndex applied;
+ private final int counter;
+
+ CounterState(TermIndex applied, int counter) {
+ this.applied = applied;
+ this.counter = counter;
+ }
+
+ TermIndex getApplied() {
+ return applied;
+ }
+
+ int getCounter() {
+ return counter;
+ }
+ }
+
+ private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ /** @return the current state. */
+ private synchronized CounterState getState() {
+ return new CounterState(getLastAppliedTermIndex(), counter.get());
+ }
+
+ private synchronized void updateState(TermIndex applied, int counterValue) {
+ updateLastAppliedTermIndex(applied);
+ counter.set(counterValue);
+ }
+
+ private synchronized int incrementCounter(TermIndex termIndex) {
+ updateLastAppliedTermIndex(termIndex);
+ return counter.incrementAndGet();
+ }
/**
- * initialize the state machine by initilize the state machine storage and
- * calling the load method which reads the last applied command and restore it
- * in counter object)
+ * Initialize the state machine storage and then load the state.
*
- * @param server the current server information
- * @param groupId the cluster groupId
- * @param raftStorage the raft storage which is used to keep raft related
- * stuff
- * @throws IOException if any error happens during load state
+ * @param server the server running this state machine
+ * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+ * @param raftStorage the storage of the server
+ * @throws IOException if it fails to load the state.
*/
@Override
- public void initialize(RaftServer server, RaftGroupId groupId,
- RaftStorage raftStorage) throws IOException {
+ public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
super.initialize(server, groupId, raftStorage);
- this.storage.init(raftStorage);
- load(storage.getLatestSnapshot());
+ storage.init(raftStorage);
+ reinitialize();
}
/**
- * very similar to initialize method, but doesn't initialize the storage
- * system because the state machine reinitialized from the PAUSE state and
- * storage system initialized before.
+ * Simply load the latest snapshot.
*
- * @throws IOException if any error happens during load state
+ * @throws IOException if it fails to load the state.
*/
@Override
public void reinitialize() throws IOException {
@@ -87,124 +117,107 @@ public class CounterStateMachine extends BaseStateMachine {
}
/**
- * Store the current state as an snapshot file in the stateMachineStorage.
+ * Store the current state as a snapshot file in the {@link #storage}.
*
* @return the index of the snapshot
*/
@Override
public long takeSnapshot() {
- //get the last applied index
- final TermIndex last = getLastAppliedTermIndex();
+ //get the current state
+ final CounterState state = getState();
+ final long index = state.getApplied().getIndex();
//create a file with a proper name to store the snapshot
- final File snapshotFile =
- storage.getSnapshotFile(last.getTerm(), last.getIndex());
+ final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
- //serialize the counter object and write it into the snapshot file
- try (ObjectOutputStream out = new ObjectOutputStream(
- new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
- out.writeObject(counter);
+ //write the counter value into the snapshot file
+ try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+ Files.newOutputStream(snapshotFile.toPath())))) {
+ out.writeInt(state.getCounter());
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
- + "\", last applied index=" + last);
+ + "\", last applied index=" + state.getApplied());
}
//return the index of the stored snapshot (which is the last applied one)
- return last.getIndex();
+ return index;
}
/**
- * Load the state of the state machine from the storage.
+ * Load the state of the state machine from the {@link #storage}.
*
- * @param snapshot to load
+ * @param snapshot the information of the snapshot being loaded
* @return the index of the snapshot or -1 if snapshot is invalid
- * @throws IOException if any error happens during read from storage
+ * @throws IOException if it failed to read from storage
*/
private long load(SingleFileSnapshotInfo snapshot) throws IOException {
- //check the snapshot nullity
+ //check null
if (snapshot == null) {
LOG.warn("The snapshot info is null.");
return RaftLog.INVALID_LOG_INDEX;
}
-
- //check the existance of the snapshot file
- final File snapshotFile = snapshot.getFile().getPath().toFile();
- if (!snapshotFile.exists()) {
- LOG.warn("The snapshot file {} does not exist for snapshot {}",
- snapshotFile, snapshot);
+ //check if the snapshot file exists.
+ final Path snapshotPath = snapshot.getFile().getPath();
+ if (!Files.exists(snapshotPath)) {
+ LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
return RaftLog.INVALID_LOG_INDEX;
}
- //load the TermIndex object for the snapshot using the file name pattern of
- // the snapshot
- final TermIndex last =
- SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-
- //read the file and cast it to the AtomicInteger and set the counter
- try (ObjectInputStream in = new ObjectInputStream(
- new BufferedInputStream(new FileInputStream(snapshotFile)))) {
- //set the last applied termIndex to the termIndex of the snapshot
- setLastAppliedTermIndex(last);
-
- //read, cast and set the counter
- counter = JavaUtils.cast(in.readObject());
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(e);
+ //read the TermIndex from the snapshot file name
+ final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+ //read the counter value from the snapshot file
+ final int counterValue;
+ try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+ counterValue = in.readInt();
}
+ //update state
+ updateState(last, counterValue);
+
return last.getIndex();
}
/**
- * Handle GET command, which used by clients to get the counter value.
+ * Process {@link CounterCommand#GET}, which gets the counter value.
*
- * @param request the GET message
- * @return the Message containing the current counter value
+ * @param request the GET request
+ * @return a {@link Message} containing the current counter value as a {@link String}.
*/
@Override
public CompletableFuture<Message> query(Message request) {
- String msg = request.getContent().toString(Charset.defaultCharset());
- if (!msg.equals("GET")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
+ final String command = request.getContent().toStringUtf8();
+ if (!CounterCommand.GET.matches(command)) {
+ return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
}
- return CompletableFuture.completedFuture(
- Message.valueOf(counter.toString()));
+ return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
}
/**
- * Apply the INCREMENT command by incrementing the counter object.
+ * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
*
* @param trx the transaction context
* @return the message containing the updated counter value
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+ final LogEntryProto entry = trx.getLogEntry();
//check if the command is valid
- String logData = entry.getStateMachineLogEntry().getLogData()
- .toString(Charset.defaultCharset());
- if (!logData.equals("INCREMENT")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
+ final String command = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
+ if (!CounterCommand.INCREMENT.matches(command)) {
+ return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
}
- //update the last applied term and index
- final long index = entry.getIndex();
- updateLastAppliedTermIndex(entry.getTerm(), index);
-
- //actual execution of the command: increment the counter
- counter.incrementAndGet();
+ //increment the counter and update term-index
+ final TermIndex termIndex = TermIndex.valueOf(entry);
+ final long incremented = incrementCounter(termIndex);
- //return the new value of the counter to the client
- final CompletableFuture<Message> f =
- CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-
- //if leader, log the incremented value and it's log index
- if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
- LOG.info("{}: Increment to {}", index, counter.toString());
+ //if leader, log the incremented value and the term-index
+ if (trx.getServerRole() == RaftPeerRole.LEADER) {
+ LOG.info("{}: Increment to {}", termIndex, incremented);
}
- return f;
+ //return the new value of the counter to the client
+ return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
}
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index e63789117..953fd5bfa 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -29,7 +29,6 @@ import org.junit.Test;
import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.Collection;
public class TestCounter extends ParameterizedBaseTest {
@@ -50,20 +49,17 @@ public class TestCounter extends ParameterizedBaseTest {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply1 = client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("10",
- reply1.getMessage().getContent().toString(Charset.defaultCharset()));
+ Assert.assertEquals("10", reply1.getMessage().getContent().toStringUtf8());
for (int i = 0; i < 10; i++) {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply2 = client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("20",
- reply2.getMessage().getContent().toString(Charset.defaultCharset()));
+ Assert.assertEquals("20", reply2.getMessage().getContent().toStringUtf8());
for (int i = 0; i < 10; i++) {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply3 = client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("30",
- reply3.getMessage().getContent().toString(Charset.defaultCharset()));
+ Assert.assertEquals("30", reply3.getMessage().getContent().toStringUtf8());
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 9cc4c5b21..629a55a67 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -132,9 +132,12 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
updateLastAppliedTermIndex(term, index);
}
- @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
protected boolean updateLastAppliedTermIndex(long term, long index) {
- final TermIndex newTI = TermIndex.valueOf(term, index);
+ return updateLastAppliedTermIndex(TermIndex.valueOf(term, index));
+ }
+
+ protected boolean updateLastAppliedTermIndex(TermIndex newTI) {
+ Objects.requireNonNull(newTI, "newTI == null");
final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
if (!newTI.equals(oldTI)) {
LOG.trace("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI);
@@ -147,7 +150,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
}
synchronized (transactionFutures) {
- for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) {
+ for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= newTI.getIndex(); ) {
transactionFutures.remove(i).complete(null);
}
}