You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/05/14 16:32:11 UTC
[incubator-ratis] branch master updated: RATIS-846: create simplest
possible example, a replicated counter (#60)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 800f2f1 RATIS-846: create simplest possible example, a replicated counter (#60)
800f2f1 is described below
commit 800f2f10f6ba007c8217ad75aaeedacaa18df306
Author: Isa Hekmatizadeh <es...@gmail.com>
AuthorDate: Thu May 14 16:32:02 2020 +0000
RATIS-846: create simplest possible example, a replicated counter (#60)
---
ratis-examples/README.md | 27 ++-
.../ratis/examples/counter/CounterCommon.java | 48 +++++
.../examples/counter/client/CounterClient.java | 94 +++++++++
.../examples/counter/server/CounterServer.java | 88 +++++++++
.../counter/server/CounterStateMachine.java | 210 +++++++++++++++++++++
.../apache/ratis/examples/counter/TestCounter.java | 69 +++++++
6 files changed, 535 insertions(+), 1 deletion(-)
diff --git a/ratis-examples/README.md b/ratis-examples/README.md
index ccabe0d..bc4ea71 100644
--- a/ratis-examples/README.md
+++ b/ratis-examples/README.md
@@ -19,9 +19,11 @@
The repository can be complied using `mvn clean package -DskipTests` under the project root directory;
see also [BUILDING.md](../BUILDING.md).
-All the scripts for running the examples are located in the [ratis-examples/src/main/bin](src/main/bin) directory;
+For the Example 1 and 2, All the scripts for running the examples are located in the [ratis-examples/src/main/bin](src/main/bin) directory;
see below for the usage.
+Example 3 does not contain any script to run it refer to [Example 3 run section](#run-counter-server-and-client).
+
## Example 1: FileStore
**FileStore** is a high performance file service supporting *read*, *write* and *delete* operations.
@@ -114,7 +116,30 @@ Continue the server command example,
${BIN}/client.sh arithmetic assign --name c --value a+b --peers ${PEERS}
${BIN}/client.sh arithmetic get --name c --peers ${PEERS}
+## Example 3: Counter
+This example designed to be the simplest possible example and because of that
+this example does not follow the scripts and command line parameters of previous
+examples.
+The Goal of this example is to maintain and replicate a counter value across
+a cluster.
+`CounterServer` class contains the main method to run the server and you can run it
+three times with three different parameters(1,2 and 3).
+all address and ports of the peers hardcoded in `CounterCommon`, so you don't
+need any extra configuration to run this example on your localhost.
+`CounterClient` class contains the main method to run the client,the client sends
+several INCREMENT command and after that, it sends a GET command and prints the
+result which should be the value of the counter.
+'Counter State Machine' implemented in `CounterStateMachine` class.
+You can find more detail by reading these classes javaDocs.
+
+### Run Counter Server and Client
+run the client and servers by these commands from ratis-examples directory:
+for server: `java -cp target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
+replace {serverIndex} with 1, 2, or 3
+for client: `java -cp target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
+
## Pre-Setup Vagrant Pseudo Cluster
+Note: This option is only available to Example 1 and 2
One can see the interactions of a three server Ratis cluster with a load-generator running against it
by using the `run_all_tests.sh` script found in [dev-support/vagrant/](../dev-support/vagrant).
See the [dev-support/vagrant/README.md](../dev-support/vagrant/README.md) for more on dependencies and what is setup.
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java
new file mode 100644
index 0000000..56b77fe
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Common Constant across servers and client
+ */
+public final class CounterCommon {
+ public static final List<RaftPeer> PEERS = new ArrayList<>(3);
+
+ static {
+ PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000"));
+ PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001"));
+ PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002"));
+ }
+
+ private CounterCommon() {
+ }
+
+ private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+ public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
+ RaftGroupId.valueOf(CounterCommon.CLUSTER_GROUP_ID), PEERS);
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
new file mode 100644
index 0000000..9607178
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.client;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommon;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Counter client application, this application sends specific number of
+ * INCREMENT command to the Counter cluster and at the end sends a GET command
+ * and print the result
+ * <p>
+ * Parameter to this application indicate the number of INCREMENT command, if no
+ * parameter found, application use default value which is 10
+ */
+public final class CounterClient {
+
+ private CounterClient(){
+ }
+
+ public static void main(String[] args)
+ throws IOException, InterruptedException {
+ //indicate the number of INCREMENT command, set 10 if no parameter passed
+ int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+
+ //build the counter cluster client
+ RaftClient raftClient = buildClient();
+
+ //use a executor service with 10 thread to send INCREMENT commands
+ // concurrently
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ //send INCREMENT commands concurrently
+ System.out.printf("Sending %d increment command...\n", increment);
+ for (int i = 0; i < increment; i++) {
+ executorService.submit(() ->
+ raftClient.send(Message.valueOf("INCREMENT")));
+ }
+
+ //shutdown the executor service and wait until they finish their work
+ executorService.shutdown();
+ executorService.awaitTermination(increment * 500, TimeUnit.MILLISECONDS);
+
+ //send GET command and print the response
+ RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));
+ String response = count.getMessage().getContent().toString(Charset.defaultCharset());
+ System.out.println(response);
+ }
+
+ /**
+ * build the RaftClient instance which is used to communicate to
+ * Counter cluster
+ *
+ * @return the created client of Counter cluster
+ */
+ private static RaftClient buildClient() {
+ RaftProperties raftProperties = new RaftProperties();
+ RaftClient.Builder builder = RaftClient.newBuilder()
+ .setProperties(raftProperties)
+ .setRaftGroup(CounterCommon.RAFT_GROUP)
+ .setClientRpc(
+ new GrpcFactory(new Parameters())
+ .newRaftClientRpc(ClientId.randomId(), raftProperties));
+ return builder.build();
+ }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
new file mode 100644
index 0000000..5f159b0
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommon;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Scanner;
+
+/**
+ * Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
+ * which maintain a counter across multi server.
+ * This server application designed to run several times with different
+ * parameters (1,2 or 3). server addresses hard coded in {@link CounterCommon}
+ * <p>
+ * Run this application three times with three different parameter set-up a
+ * ratis cluster which maintain a counter value replicated in each server memory
+ */
+public final class CounterServer {
+
+ private CounterServer(){
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ if (args.length < 1) {
+ System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
+ System.err.println("{serverIndex} could be 1, 2 or 3");
+ System.exit(1);
+ }
+
+ //find current peer object based on application parameter
+ RaftPeer currentPeer =
+ CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1);
+
+ //create a property object
+ RaftProperties properties = new RaftProperties();
+
+ //set the storage directory (different for each peer) in RaftProperty object
+ File raftStorageDir = new File("./" + currentPeer.getId().toString());
+ RaftServerConfigKeys.setStorageDir(properties,
+ Collections.singletonList(raftStorageDir));
+
+ //set the port which server listen to in RaftProperty object
+ final int port = NetUtils.createSocketAddr(currentPeer.getAddress()).getPort();
+ GrpcConfigKeys.Server.setPort(properties, port);
+
+ //create the counter state machine which hold the counter value
+ CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+ //create and start the Raft server
+ RaftServer server = RaftServer.newBuilder()
+ .setGroup(CounterCommon.RAFT_GROUP)
+ .setProperties(properties)
+ .setServerId(currentPeer.getId())
+ .setStateMachine(counterStateMachine)
+ .build();
+ server.start();
+
+ //exit when any input entered
+ Scanner scanner = new Scanner(System.in);
+ scanner.nextLine();
+ server.close();
+ }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
new file mode 100644
index 0000000..7159ec1
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * State machine implementation for Counter server application. This class
+ * maintain a {@link AtomicInteger} object as a state and accept two commands:
+ * GET and INCREMENT, GET is a ReadOnly command which will be handled by
+ * {@code query} method however INCREMENT is a transactional command which
+ * will be handled by {@code applyTransaction}.
+ */
+public class CounterStateMachine extends BaseStateMachine {
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
+ private AtomicInteger counter = new AtomicInteger(0);
+
+ /**
+ * initialize the state machine by initilize the state machine storage and
+ * calling the load method which reads the last applied command and restore it
+ * in counter object)
+ *
+ * @param server the current server information
+ * @param groupId the cluster groupId
+ * @param raftStorage the raft storage which is used to keep raft related
+ * stuff
+ * @throws IOException if any error happens during load state
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId groupId,
+ RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
+ this.storage.init(raftStorage);
+ load(storage.getLatestSnapshot());
+ }
+
+ /**
+ * very similar to initialize method, but doesn't initialize the storage
+ * system because the state machine reinitialized from the PAUSE state and
+ * storage system initialized before.
+ *
+ * @throws IOException if any error happens during load state
+ */
+ @Override
+ public void reinitialize() throws IOException {
+ load(storage.getLatestSnapshot());
+ }
+
+ /**
+ * Store the current state as an snapshot file in the stateMachineStorage.
+ *
+ * @return the index of the snapshot
+ */
+ @Override
+ public long takeSnapshot() {
+ //get the last applied index
+ final TermIndex last = getLastAppliedTermIndex();
+
+ //create a file with a proper name to store the snapshot
+ final File snapshotFile =
+ storage.getSnapshotFile(last.getTerm(), last.getIndex());
+
+ //serialize the counter object and write it into the snapshot file
+ try (ObjectOutputStream out = new ObjectOutputStream(
+ new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+ out.writeObject(counter);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ + "\", last applied index=" + last);
+ }
+
+ //return the index of the stored snapshot (which is the last applied one)
+ return last.getIndex();
+ }
+
+ /**
+ * Load the state of the state machine from the storage.
+ *
+ * @param snapshot to load
+ * @return the index of the snapshot or -1 if snapshot is invalid
+ * @throws IOException if any error happens during read from storage
+ */
+ private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+ //check the snapshot nullity
+ if (snapshot == null) {
+ LOG.warn("The snapshot info is null.");
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ //check the existance of the snapshot file
+ final File snapshotFile = snapshot.getFile().getPath().toFile();
+ if (!snapshotFile.exists()) {
+ LOG.warn("The snapshot file {} does not exist for snapshot {}",
+ snapshotFile, snapshot);
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ //load the TermIndex object for the snapshot using the file name pattern of
+ // the snapshot
+ final TermIndex last =
+ SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
+
+ //read the file and cast it to the AtomicInteger and set the counter
+ try (ObjectInputStream in = new ObjectInputStream(
+ new BufferedInputStream(new FileInputStream(snapshotFile)))) {
+ //set the last applied termIndex to the termIndex of the snapshot
+ setLastAppliedTermIndex(last);
+
+ //read, cast and set the counter
+ counter = JavaUtils.cast(in.readObject());
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+
+ return last.getIndex();
+ }
+
+ /**
+ * Handle GET command, which used by clients to get the counter value.
+ *
+ * @param request the GET message
+ * @return the Message containing the current counter value
+ */
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ String msg = request.getContent().toString(Charset.defaultCharset());
+ if (!msg.equals("GET")) {
+ return CompletableFuture.completedFuture(
+ Message.valueOf("Invalid Command"));
+ }
+ return CompletableFuture.completedFuture(
+ Message.valueOf(counter.toString()));
+ }
+
+ /**
+ * Apply the INCREMENT command by incrementing the counter object.
+ *
+ * @param trx the transaction context
+ * @return the message containing the updated counter value
+ */
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+
+ //check if the command is valid
+ String logData = entry.getStateMachineLogEntry().getLogData()
+ .toString(Charset.defaultCharset());
+ if (!logData.equals("INCREMENT")) {
+ return CompletableFuture.completedFuture(
+ Message.valueOf("Invalid Command"));
+ }
+ //update the last applied term and index
+ final long index = entry.getIndex();
+ updateLastAppliedTermIndex(entry.getTerm(), index);
+
+ //actual execution of the command: increment the counter
+ counter.incrementAndGet();
+
+ //return the new value of the counter to the client
+ final CompletableFuture<Message> f =
+ CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+
+ //if leader, log the incremented value and it's log index
+ if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
+ LOG.info("{}: Increment to {}", index, counter.toString());
+ }
+
+ return f;
+ }
+}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
new file mode 100644
index 0000000..beb71be
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.ParameterizedBaseTest;
+import org.apache.ratis.examples.counter.server.CounterStateMachine;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collection;
+
+public class TestCounter extends ParameterizedBaseTest {
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() throws IOException {
+ return getMiniRaftClusters(CounterStateMachine.class, 3);
+ }
+
+ @Parameterized.Parameter
+ public MiniRaftCluster cluster;
+
+ @Test
+ public void testSeveralCounter() throws IOException, InterruptedException {
+ setAndStart(cluster);
+ try (final RaftClient client = cluster.createClient()) {
+ for (int i = 0; i < 10; i++) {
+ client.send(Message.valueOf("INCREMENT"));
+ }
+ RaftClientReply reply1 = client.sendReadOnly(Message.valueOf("GET"));
+ Assert.assertEquals("10",
+ reply1.getMessage().getContent().toString(Charset.defaultCharset()));
+ for (int i = 0; i < 10; i++) {
+ client.send(Message.valueOf("INCREMENT"));
+ }
+ RaftClientReply reply2 = client.sendReadOnly(Message.valueOf("GET"));
+ Assert.assertEquals("20",
+ reply2.getMessage().getContent().toString(Charset.defaultCharset()));
+ for (int i = 0; i < 10; i++) {
+ client.send(Message.valueOf("INCREMENT"));
+ }
+ RaftClientReply reply3 = client.sendReadOnly(Message.valueOf("GET"));
+ Assert.assertEquals("30",
+ reply3.getMessage().getContent().toString(Charset.defaultCharset()));
+ }
+ }
+}