You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/05/14 16:32:11 UTC

[incubator-ratis] branch master updated: RATIS-846: create simplest possible example, a replicated counter (#60)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 800f2f1  RATIS-846: create simplest possible example, a replicated counter (#60)
800f2f1 is described below

commit 800f2f10f6ba007c8217ad75aaeedacaa18df306
Author: Isa Hekmatizadeh <es...@gmail.com>
AuthorDate: Thu May 14 16:32:02 2020 +0000

    RATIS-846: create simplest possible example, a replicated counter (#60)
---
 ratis-examples/README.md                           |  27 ++-
 .../ratis/examples/counter/CounterCommon.java      |  48 +++++
 .../examples/counter/client/CounterClient.java     |  94 +++++++++
 .../examples/counter/server/CounterServer.java     |  88 +++++++++
 .../counter/server/CounterStateMachine.java        | 210 +++++++++++++++++++++
 .../apache/ratis/examples/counter/TestCounter.java |  69 +++++++
 6 files changed, 535 insertions(+), 1 deletion(-)

diff --git a/ratis-examples/README.md b/ratis-examples/README.md
index ccabe0d..bc4ea71 100644
--- a/ratis-examples/README.md
+++ b/ratis-examples/README.md
@@ -19,9 +19,11 @@
 The repository can be complied using `mvn clean package -DskipTests` under the project root directory;
 see also [BUILDING.md](../BUILDING.md).
 
-All the scripts for running the examples are located in the [ratis-examples/src/main/bin](src/main/bin) directory;
+For the Example 1 and 2, All the scripts for running the examples are located in the [ratis-examples/src/main/bin](src/main/bin) directory;
 see below for the usage.
 
+Example 3 does not contain any script to run it refer to [Example 3 run section](#run-counter-server-and-client).
+
 ## Example 1: FileStore
 
 **FileStore** is a high performance file service supporting *read*, *write* and *delete* operations.
@@ -114,7 +116,30 @@ Continue the server command example,
     ${BIN}/client.sh arithmetic assign --name c --value a+b --peers ${PEERS}
     ${BIN}/client.sh arithmetic get --name c --peers ${PEERS}
 
+## Example 3: Counter
+This example designed to be the simplest possible example and because of that 
+this example does not follow the scripts and command line parameters of previous
+examples.
+The Goal of this example is to maintain and replicate a counter value across 
+a cluster.
+`CounterServer` class contains the main method to run the server and you can run it 
+three times with three different parameters(1,2 and 3).
+all address and ports of the peers hardcoded in `CounterCommon`, so you don't 
+need any extra configuration to run this example on your localhost.
+`CounterClient` class contains the main method to run the client,the client sends 
+several INCREMENT command and after that, it sends a GET command and prints the 
+result which should be the value of the counter.
+'Counter State Machine' implemented in `CounterStateMachine` class.
+You can find more detail by reading these classes javaDocs.
+
+### Run Counter Server and Client
+run the client and servers by these commands from ratis-examples directory:
+for server: `java -cp target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
+replace {serverIndex} with 1, 2, or 3
+for client: `java -cp target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
+
 ## Pre-Setup Vagrant Pseudo Cluster
+Note: This option is only available to Example 1 and 2
 One can see the interactions of a three server Ratis cluster with a load-generator running against it
 by using the `run_all_tests.sh` script found in [dev-support/vagrant/](../dev-support/vagrant).
 See the [dev-support/vagrant/README.md](../dev-support/vagrant/README.md) for more on dependencies and what is setup.
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java
new file mode 100644
index 0000000..56b77fe
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommon.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Common Constant across servers and client
+ */
+public final class CounterCommon {
+  public static final List<RaftPeer> PEERS = new ArrayList<>(3);
+
+  static {
+    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000"));
+    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001"));
+    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002"));
+  }
+
+  private CounterCommon() {
+  }
+
+  private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+  public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
+      RaftGroupId.valueOf(CounterCommon.CLUSTER_GROUP_ID), PEERS);
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
new file mode 100644
index 0000000..9607178
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.client;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommon;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Counter client application, this application sends specific number of
+ * INCREMENT command to the Counter cluster and at the end sends a GET command
+ * and print the result
+ * <p>
+ * Parameter to this application indicate the number of INCREMENT command, if no
+ * parameter found, application use default value which is 10
+ */
+public final class CounterClient {
+
+  private CounterClient(){
+  }
+
+  public static void main(String[] args)
+      throws IOException, InterruptedException {
+    //indicate the number of INCREMENT command, set 10 if no parameter passed
+    int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+
+    //build the counter cluster client
+    RaftClient raftClient = buildClient();
+
+    //use a executor service with 10 thread to send INCREMENT commands
+    // concurrently
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    //send INCREMENT commands concurrently
+    System.out.printf("Sending %d increment command...\n", increment);
+    for (int i = 0; i < increment; i++) {
+      executorService.submit(() ->
+          raftClient.send(Message.valueOf("INCREMENT")));
+    }
+
+    //shutdown the executor service and wait until they finish their work
+    executorService.shutdown();
+    executorService.awaitTermination(increment * 500, TimeUnit.MILLISECONDS);
+
+    //send GET command and print the response
+    RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));
+    String response = count.getMessage().getContent().toString(Charset.defaultCharset());
+    System.out.println(response);
+  }
+
+  /**
+   * build the RaftClient instance which is used to communicate to
+   * Counter cluster
+   *
+   * @return the created client of Counter cluster
+   */
+  private static RaftClient buildClient() {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder = RaftClient.newBuilder()
+        .setProperties(raftProperties)
+        .setRaftGroup(CounterCommon.RAFT_GROUP)
+        .setClientRpc(
+            new GrpcFactory(new Parameters())
+                .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
new file mode 100644
index 0000000..5f159b0
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommon;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Scanner;
+
+/**
+ * Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
+ * which maintain a counter across multi server.
+ * This server application designed to run several times with different
+ * parameters (1,2 or 3). server addresses hard coded in {@link CounterCommon}
+ * <p>
+ * Run this application three times with three different parameter set-up a
+ * ratis cluster which maintain a counter value replicated in each server memory
+ */
+public final class CounterServer {
+
+  private CounterServer(){
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    if (args.length < 1) {
+      System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
+      System.err.println("{serverIndex} could be 1, 2 or 3");
+      System.exit(1);
+    }
+
+    //find current peer object based on application parameter
+    RaftPeer currentPeer =
+        CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1);
+
+    //create a property object
+    RaftProperties properties = new RaftProperties();
+
+    //set the storage directory (different for each peer) in RaftProperty object
+    File raftStorageDir = new File("./" + currentPeer.getId().toString());
+    RaftServerConfigKeys.setStorageDir(properties,
+        Collections.singletonList(raftStorageDir));
+
+    //set the port which server listen to in RaftProperty object
+    final int port = NetUtils.createSocketAddr(currentPeer.getAddress()).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    //create the counter state machine which hold the counter value
+    CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+    //create and start the Raft server
+    RaftServer server = RaftServer.newBuilder()
+        .setGroup(CounterCommon.RAFT_GROUP)
+        .setProperties(properties)
+        .setServerId(currentPeer.getId())
+        .setStateMachine(counterStateMachine)
+        .build();
+    server.start();
+
+    //exit when any input entered
+    Scanner scanner = new Scanner(System.in);
+    scanner.nextLine();
+    server.close();
+  }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
new file mode 100644
index 0000000..7159ec1
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * State machine implementation for Counter server application. This class
+ * maintain a {@link AtomicInteger} object as a state and accept two commands:
+ * GET and INCREMENT, GET is a ReadOnly command which will be handled by
+ * {@code query} method however INCREMENT is a transactional command which
+ * will be handled by {@code applyTransaction}.
+ */
+public class CounterStateMachine extends BaseStateMachine {
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
+  private AtomicInteger counter = new AtomicInteger(0);
+
+  /**
+   * initialize the state machine by initilize the state machine storage and
+   * calling the load method which reads the last applied command and restore it
+   * in counter object)
+   *
+   * @param server      the current server information
+   * @param groupId     the cluster groupId
+   * @param raftStorage the raft storage which is used to keep raft related
+   *                    stuff
+   * @throws IOException if any error happens during load state
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId groupId,
+                         RaftStorage raftStorage) throws IOException {
+    super.initialize(server, groupId, raftStorage);
+    this.storage.init(raftStorage);
+    load(storage.getLatestSnapshot());
+  }
+
+  /**
+   * very similar to initialize method, but doesn't initialize the storage
+   * system because the state machine reinitialized from the PAUSE state and
+   * storage system initialized before.
+   *
+   * @throws IOException if any error happens during load state
+   */
+  @Override
+  public void reinitialize() throws IOException {
+    load(storage.getLatestSnapshot());
+  }
+
+  /**
+   * Store the current state as an snapshot file in the stateMachineStorage.
+   *
+   * @return the index of the snapshot
+   */
+  @Override
+  public long takeSnapshot() {
+    //get the last applied index
+    final TermIndex last = getLastAppliedTermIndex();
+
+    //create a file with a proper name to store the snapshot
+    final File snapshotFile =
+        storage.getSnapshotFile(last.getTerm(), last.getIndex());
+
+    //serialize the counter object and write it into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(
+        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+      out.writeObject(counter);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to write snapshot file \"" + snapshotFile
+          + "\", last applied index=" + last);
+    }
+
+    //return the index of the stored snapshot (which is the last applied one)
+    return last.getIndex();
+  }
+
+  /**
+   * Load the state of the state machine from the storage.
+   *
+   * @param snapshot to load
+   * @return the index of the snapshot or -1 if snapshot is invalid
+   * @throws IOException if any error happens during read from storage
+   */
+  private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+    //check the snapshot nullity
+    if (snapshot == null) {
+      LOG.warn("The snapshot info is null.");
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    //check the existance of the snapshot file
+    final File snapshotFile = snapshot.getFile().getPath().toFile();
+    if (!snapshotFile.exists()) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}",
+          snapshotFile, snapshot);
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    //load the TermIndex object for the snapshot using the file name pattern of
+    // the snapshot
+    final TermIndex last =
+        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
+
+    //read the file and cast it to the AtomicInteger and set the counter
+    try (ObjectInputStream in = new ObjectInputStream(
+        new BufferedInputStream(new FileInputStream(snapshotFile)))) {
+      //set the last applied termIndex to the termIndex of the snapshot
+      setLastAppliedTermIndex(last);
+
+      //read, cast and set the counter
+      counter = JavaUtils.cast(in.readObject());
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+
+    return last.getIndex();
+  }
+
+  /**
+   * Handle GET command, which used by clients to get the counter value.
+   *
+   * @param request the GET message
+   * @return the Message containing the current counter value
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    String msg = request.getContent().toString(Charset.defaultCharset());
+    if (!msg.equals("GET")) {
+      return CompletableFuture.completedFuture(
+          Message.valueOf("Invalid Command"));
+    }
+    return CompletableFuture.completedFuture(
+        Message.valueOf(counter.toString()));
+  }
+
+  /**
+   * Apply the INCREMENT command by incrementing the counter object.
+   *
+   * @param trx the transaction context
+   * @return the message containing the updated counter value
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+
+    //check if the command is valid
+    String logData = entry.getStateMachineLogEntry().getLogData()
+        .toString(Charset.defaultCharset());
+    if (!logData.equals("INCREMENT")) {
+      return CompletableFuture.completedFuture(
+          Message.valueOf("Invalid Command"));
+    }
+    //update the last applied term and index
+    final long index = entry.getIndex();
+    updateLastAppliedTermIndex(entry.getTerm(), index);
+
+    //actual execution of the command: increment the counter
+    counter.incrementAndGet();
+
+    //return the new value of the counter to the client
+    final CompletableFuture<Message> f =
+        CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+
+    //if leader, log the incremented value and it's log index
+    if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", index, counter.toString());
+    }
+
+    return f;
+  }
+}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
new file mode 100644
index 0000000..beb71be
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.ParameterizedBaseTest;
+import org.apache.ratis.examples.counter.server.CounterStateMachine;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collection;
+
+public class TestCounter extends ParameterizedBaseTest {
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    return getMiniRaftClusters(CounterStateMachine.class, 3);
+  }
+
+  @Parameterized.Parameter
+  public MiniRaftCluster cluster;
+
+  @Test
+  public void testSeveralCounter() throws IOException, InterruptedException {
+    setAndStart(cluster);
+    try (final RaftClient client = cluster.createClient()) {
+      for (int i = 0; i < 10; i++) {
+        client.send(Message.valueOf("INCREMENT"));
+      }
+      RaftClientReply reply1 = client.sendReadOnly(Message.valueOf("GET"));
+      Assert.assertEquals("10",
+          reply1.getMessage().getContent().toString(Charset.defaultCharset()));
+      for (int i = 0; i < 10; i++) {
+        client.send(Message.valueOf("INCREMENT"));
+      }
+      RaftClientReply reply2 = client.sendReadOnly(Message.valueOf("GET"));
+      Assert.assertEquals("20",
+          reply2.getMessage().getContent().toString(Charset.defaultCharset()));
+      for (int i = 0; i < 10; i++) {
+        client.send(Message.valueOf("INCREMENT"));
+      }
+      RaftClientReply reply3 = client.sendReadOnly(Message.valueOf("GET"));
+      Assert.assertEquals("30",
+          reply3.getMessage().getContent().toString(Charset.defaultCharset()));
+    }
+  }
+}