You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/12/09 21:53:57 UTC

[ratis] branch master updated: RATIS-1758. Add linearizable read in Counter example (#797)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ff05bac1b RATIS-1758. Add linearizable read in Counter example (#797)
ff05bac1b is described below

commit ff05bac1bc7c103f396b82dedec0af817bbb96e5
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Sat Dec 10 05:53:51 2022 +0800

    RATIS-1758. Add linearizable read in Counter example (#797)
---
 ratis-examples/README.md                           |  8 +++---
 .../apache/ratis/examples/common/Constants.java    | 31 +++++++++++++++++++---
 .../examples/counter/client/CounterClient.java     | 31 ++++++++++++++++++++++
 .../examples/counter/server/CounterServer.java     | 24 ++++++++++++-----
 .../counter/server/CounterStateMachine.java        | 18 +++++++++++++
 .../apache/ratis/examples/debug/server/Server.java |  3 ++-
 ratis-examples/src/main/resources/conf.properties  |  4 ++-
 7 files changed, 104 insertions(+), 15 deletions(-)

diff --git a/ratis-examples/README.md b/ratis-examples/README.md
index de44a9076..1e50058dc 100644
--- a/ratis-examples/README.md
+++ b/ratis-examples/README.md
@@ -134,10 +134,10 @@ result which should be the value of the counter.
 You can find more detail by reading these classes javaDocs.
 
 ### Run Counter Server and Client
-run the client and servers by these commands from ratis-examples directory:
-for server: `java -cp target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
-replace {serverIndex} with 1, 2, or 3
-for client: `java -cp target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
+run the client and servers by these commands from ratis directory:
+for server: `java -cp ./ratis-examples/target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
+replace {serverIndex} with 0, 1, or 2
+for client: `java -cp ./ratis-examples/target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
 
 ## Pre-Setup Vagrant Pseudo Cluster
 Note: This option is only available to Example 1 and 2
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index 9fa873fc2..c7559597e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -21,6 +21,7 @@ package org.apache.ratis.examples.common;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.BufferedReader;
 import java.io.FileInputStream;
@@ -30,11 +31,15 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Constants across servers and clients
@@ -42,6 +47,7 @@ import java.util.UUID;
 public final class Constants {
   public static final List<RaftPeer> PEERS;
   public static final String PATH;
+  public static final List<TimeDuration> SIMULATED_SLOWNESS;
 
   static {
     final Properties properties = new Properties();
@@ -53,20 +59,39 @@ public final class Constants {
     } catch (IOException e) {
       throw new IllegalStateException("Failed to load " + conf, e);
     }
-    final String key = "raft.server.address.list";
-    final String[] addresses = Optional.ofNullable(properties.getProperty(key))
+    Function<String, String[]> parseConfList = confKey ->
+        Optional.ofNullable(properties.getProperty(confKey))
         .map(s -> s.split(","))
         .orElse(null);
+    final String key = "raft.server.address.list";
+    final String[] addresses = parseConfList.apply(key);
     if (addresses == null || addresses.length == 0) {
       throw new IllegalArgumentException("Failed to get " + key + " from " + conf);
     }
 
+    final String priorityKey = "raft.server.priority.list";
+    final String[] priorities = parseConfList.apply(priorityKey);
+    if (priorities != null && priorities.length != addresses.length) {
+      throw new IllegalArgumentException("priority should be assigned to each server in " + conf);
+    }
+
+    final String slownessKey = "raft.server.simulated-slowness.list";
+    final String[] slowness = parseConfList.apply(slownessKey);
+    if (slowness != null && slowness.length != addresses.length) {
+      throw new IllegalArgumentException("simulated-slowness should be assigned to each server in" + conf);
+    }
+    SIMULATED_SLOWNESS = slowness == null ? null:
+        Arrays.stream(slowness)
+          .map(s -> TimeDuration.valueOf(s, TimeUnit.SECONDS))
+          .collect(Collectors.toList());
+
     final String key1 = "raft.server.root.storage.path";
     final String path = properties.getProperty(key1);
     PATH = path == null ? "./ratis-examples/target" : path;
     final List<RaftPeer> peers = new ArrayList<>(addresses.length);
     for (int i = 0; i < addresses.length; i++) {
-      peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
+      final int priority = priorities == null ? 0 : Integer.parseInt(priorities[i]);
+      peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).setPriority(priority).build());
     }
     PEERS = Collections.unmodifiableList(peers);
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index c0350ec72..7076809f5 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -90,6 +91,36 @@ public final class CounterClient implements Closeable {
     final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
     final String count = reply.getMessage().getContent().toStringUtf8();
     System.out.println("Current counter value: " + count);
+
+    // using Linearizable Read
+    futures.clear();
+    final long startTime = System.currentTimeMillis();
+    final ExecutorService executor = Executors.newFixedThreadPool(Constants.PEERS.size());
+    Constants.PEERS.forEach(p -> {
+      final Future<RaftClientReply> f = CompletableFuture.supplyAsync(() -> {
+                try {
+                  return client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
+                } catch (IOException e) {
+                  System.err.println("Failed read-only request");
+                  return RaftClientReply.newBuilder().setSuccess(false).build();
+                }
+              }, executor).whenCompleteAsync((r, ex) -> {
+                if (ex != null || !r.isSuccess()) {
+                  System.err.println("Failed " + r);
+                  return;
+                }
+                final long endTime = System.currentTimeMillis();
+                final long elapsedSec = (endTime-startTime) / 1000;
+                final String countValue = r.getMessage().getContent().toStringUtf8();
+                System.out.println("read from " + p.getId() + " and get counter value: " + countValue
+                    + ", time elapsed: " + elapsedSec + " seconds");
+              });
+      futures.add(f);
+    });
+
+    for (Future<RaftClientReply> f : futures) {
+      f.get();
+    }
   }
 
   public static void main(String[] args) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index 7a6367ece..fab7b36c3 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -24,12 +24,14 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Scanner;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -46,19 +48,27 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 public final class CounterServer implements Closeable {
   private final RaftServer server;
 
-  public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+  public CounterServer(RaftPeer peer, File storageDir, TimeDuration simulatedSlowness) throws IOException {
     //create a property object
     final RaftProperties properties = new RaftProperties();
 
     //set the storage directory (different for each peer) in the RaftProperty object
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
 
+    //set the read policy to Linearizable Read.
+    //the Default policy will route read-only requests to leader and directly query leader statemachine.
+    //Linearizable Read allows to route read-only requests to any group member
+    //and uses ReadIndex to guarantee strong consistency.
+    RaftServerConfigKeys.Read.setOption(properties, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+    //set the linearizable read timeout
+    RaftServerConfigKeys.Read.setTimeout(properties, TimeDuration.ONE_MINUTE);
+
     //set the port (different for each peer) in RaftProperty object
     final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
 
     //create the counter state machine which holds the counter value
-    final CounterStateMachine counterStateMachine = new CounterStateMachine();
+    final CounterStateMachine counterStateMachine = new CounterStateMachine(simulatedSlowness);
 
     //build the Raft server
     this.server = RaftServer.newBuilder()
@@ -88,8 +98,10 @@ public final class CounterServer implements Closeable {
       if (peerIndex < 0 || peerIndex > 2) {
         throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex);
       }
-
-      startServer(peerIndex);
+      TimeDuration simulatedSlowness = Optional.ofNullable(Constants.SIMULATED_SLOWNESS)
+                  .map(slownessList -> slownessList.get(peerIndex))
+                  .orElse(TimeDuration.ZERO);
+      startServer(peerIndex, simulatedSlowness);
     } catch(Throwable e) {
       e.printStackTrace();
       System.err.println();
@@ -102,13 +114,13 @@ public final class CounterServer implements Closeable {
     }
   }
 
-  private static void startServer(int peerIndex) throws IOException {
+  private static void startServer(int peerIndex, TimeDuration simulatedSlowness) throws IOException {
     //get peer and define storage dir
     final RaftPeer currentPeer = Constants.PEERS.get(peerIndex);
     final File storageDir = new File("./" + currentPeer.getId());
 
     //start a counter server
-    try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) {
+    try(CounterServer counterServer = new CounterServer(currentPeer, storageDir, simulatedSlowness)) {
       counterServer.start();
 
       //exit when any input entered
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index f383d3a83..299b67325 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -79,6 +80,15 @@ public class CounterStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
   private final AtomicInteger counter = new AtomicInteger(0);
 
+  private final TimeDuration simulatedSlowness;
+
+  CounterStateMachine(TimeDuration simulatedSlowness) {
+    this.simulatedSlowness = simulatedSlowness;
+  }
+  CounterStateMachine() {
+    this.simulatedSlowness = TimeDuration.ZERO;
+  }
+
   /** @return the current state. */
   private synchronized CounterState getState() {
     return new CounterState(getLastAppliedTermIndex(), counter.get());
@@ -90,6 +100,14 @@ public class CounterStateMachine extends BaseStateMachine {
   }
 
   private synchronized int incrementCounter(TermIndex termIndex) {
+    try {
+      if (!simulatedSlowness.equals(TimeDuration.ZERO)) {
+        simulatedSlowness.sleep();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("{}: get interrupted in simulated slowness sleep before apply transaction", this);
+      Thread.currentThread().interrupt();
+    }
     updateLastAppliedTermIndex(termIndex);
     return counter.incrementAndGet();
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
index a2ab8e172..4377a1420 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
@@ -21,6 +21,7 @@ package org.apache.ratis.examples.debug.server;
 import org.apache.ratis.examples.common.Constants;
 import org.apache.ratis.examples.counter.server.CounterServer;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
 import java.io.IOException;
@@ -45,7 +46,7 @@ public final class Server {
         .findFirst().orElseThrow(() -> new IllegalArgumentException("Peer not found: " + args[0]));
 
     final File storageDir = new File(Constants.PATH, currentPeer.getId().toString());
-    final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
+    final CounterServer counterServer = new CounterServer(currentPeer, storageDir, TimeDuration.ZERO);
     counterServer.start();
   }
 }
diff --git a/ratis-examples/src/main/resources/conf.properties b/ratis-examples/src/main/resources/conf.properties
index 839090101..8e0a901cd 100644
--- a/ratis-examples/src/main/resources/conf.properties
+++ b/ratis-examples/src/main/resources/conf.properties
@@ -15,4 +15,6 @@
 # limitations under the License.
 
 raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
-# raft.server.root.storage.path
\ No newline at end of file
+# raft.server.root.storage.path
+# raft.server.priority.list=1,0,0
+# raft.server.simulated-slowness.list=0,1s,0
\ No newline at end of file