You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/12/09 21:53:57 UTC
[ratis] branch master updated: RATIS-1758. Add linearizable read in Counter example (#797)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ff05bac1b RATIS-1758. Add linearizable read in Counter example (#797)
ff05bac1b is described below
commit ff05bac1bc7c103f396b82dedec0af817bbb96e5
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Sat Dec 10 05:53:51 2022 +0800
RATIS-1758. Add linearizable read in Counter example (#797)
---
ratis-examples/README.md | 8 +++---
.../apache/ratis/examples/common/Constants.java | 31 +++++++++++++++++++---
.../examples/counter/client/CounterClient.java | 31 ++++++++++++++++++++++
.../examples/counter/server/CounterServer.java | 24 ++++++++++++-----
.../counter/server/CounterStateMachine.java | 18 +++++++++++++
.../apache/ratis/examples/debug/server/Server.java | 3 ++-
ratis-examples/src/main/resources/conf.properties | 4 ++-
7 files changed, 104 insertions(+), 15 deletions(-)
diff --git a/ratis-examples/README.md b/ratis-examples/README.md
index de44a9076..1e50058dc 100644
--- a/ratis-examples/README.md
+++ b/ratis-examples/README.md
@@ -134,10 +134,10 @@ result which should be the value of the counter.
You can find more detail by reading these classes javaDocs.
### Run Counter Server and Client
-run the client and servers by these commands from ratis-examples directory:
-for server: `java -cp target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
-replace {serverIndex} with 1, 2, or 3
-for client: `java -cp target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
+run the client and servers by these commands from ratis directory:
+for server: `java -cp ./ratis-examples/target/*.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}`
+replace {serverIndex} with 0, 1, or 2
+for client: `java -cp ./ratis-examples/target/*.jar org.apache.ratis.examples.counter.client.CounterClient`
## Pre-Setup Vagrant Pseudo Cluster
Note: This option is only available to Example 1 and 2
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index 9fa873fc2..c7559597e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -21,6 +21,7 @@ package org.apache.ratis.examples.common;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
import java.io.BufferedReader;
import java.io.FileInputStream;
@@ -30,11 +31,15 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Constants across servers and clients
@@ -42,6 +47,7 @@ import java.util.UUID;
public final class Constants {
public static final List<RaftPeer> PEERS;
public static final String PATH;
+ public static final List<TimeDuration> SIMULATED_SLOWNESS;
static {
final Properties properties = new Properties();
@@ -53,20 +59,39 @@ public final class Constants {
} catch (IOException e) {
throw new IllegalStateException("Failed to load " + conf, e);
}
- final String key = "raft.server.address.list";
- final String[] addresses = Optional.ofNullable(properties.getProperty(key))
+ Function<String, String[]> parseConfList = confKey ->
+ Optional.ofNullable(properties.getProperty(confKey))
.map(s -> s.split(","))
.orElse(null);
+ final String key = "raft.server.address.list";
+ final String[] addresses = parseConfList.apply(key);
if (addresses == null || addresses.length == 0) {
throw new IllegalArgumentException("Failed to get " + key + " from " + conf);
}
+ final String priorityKey = "raft.server.priority.list";
+ final String[] priorities = parseConfList.apply(priorityKey);
+ if (priorities != null && priorities.length != addresses.length) {
+ throw new IllegalArgumentException("priority should be assigned to each server in " + conf);
+ }
+
+ final String slownessKey = "raft.server.simulated-slowness.list";
+ final String[] slowness = parseConfList.apply(slownessKey);
+ if (slowness != null && slowness.length != addresses.length) {
+ throw new IllegalArgumentException("simulated-slowness should be assigned to each server in" + conf);
+ }
+ SIMULATED_SLOWNESS = slowness == null ? null:
+ Arrays.stream(slowness)
+ .map(s -> TimeDuration.valueOf(s, TimeUnit.SECONDS))
+ .collect(Collectors.toList());
+
final String key1 = "raft.server.root.storage.path";
final String path = properties.getProperty(key1);
PATH = path == null ? "./ratis-examples/target" : path;
final List<RaftPeer> peers = new ArrayList<>(addresses.length);
for (int i = 0; i < addresses.length; i++) {
- peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
+ final int priority = priorities == null ? 0 : Integer.parseInt(priorities[i]);
+ peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).setPriority(priority).build());
}
PEERS = Collections.unmodifiableList(peers);
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index c0350ec72..7076809f5 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -90,6 +91,36 @@ public final class CounterClient implements Closeable {
final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
final String count = reply.getMessage().getContent().toStringUtf8();
System.out.println("Current counter value: " + count);
+
+ // using Linearizable Read
+ futures.clear();
+ final long startTime = System.currentTimeMillis();
+ final ExecutorService executor = Executors.newFixedThreadPool(Constants.PEERS.size());
+ Constants.PEERS.forEach(p -> {
+ final Future<RaftClientReply> f = CompletableFuture.supplyAsync(() -> {
+ try {
+ return client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
+ } catch (IOException e) {
+ System.err.println("Failed read-only request");
+ return RaftClientReply.newBuilder().setSuccess(false).build();
+ }
+ }, executor).whenCompleteAsync((r, ex) -> {
+ if (ex != null || !r.isSuccess()) {
+ System.err.println("Failed " + r);
+ return;
+ }
+ final long endTime = System.currentTimeMillis();
+ final long elapsedSec = (endTime-startTime) / 1000;
+ final String countValue = r.getMessage().getContent().toStringUtf8();
+ System.out.println("read from " + p.getId() + " and get counter value: " + countValue
+ + ", time elapsed: " + elapsedSec + " seconds");
+ });
+ futures.add(f);
+ });
+
+ for (Future<RaftClientReply> f : futures) {
+ f.get();
+ }
}
public static void main(String[] args) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index 7a6367ece..fab7b36c3 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -24,12 +24,14 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.TimeDuration;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
import java.util.Scanner;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -46,19 +48,27 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public final class CounterServer implements Closeable {
private final RaftServer server;
- public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+ public CounterServer(RaftPeer peer, File storageDir, TimeDuration simulatedSlowness) throws IOException {
//create a property object
final RaftProperties properties = new RaftProperties();
//set the storage directory (different for each peer) in the RaftProperty object
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
+ //set the read policy to Linearizable Read.
+ //the Default policy will route read-only requests to leader and directly query leader statemachine.
+ //Linearizable Read allows to route read-only requests to any group member
+ //and uses ReadIndex to guarantee strong consistency.
+ RaftServerConfigKeys.Read.setOption(properties, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+ //set the linearizable read timeout
+ RaftServerConfigKeys.Read.setTimeout(properties, TimeDuration.ONE_MINUTE);
+
//set the port (different for each peer) in RaftProperty object
final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
//create the counter state machine which holds the counter value
- final CounterStateMachine counterStateMachine = new CounterStateMachine();
+ final CounterStateMachine counterStateMachine = new CounterStateMachine(simulatedSlowness);
//build the Raft server
this.server = RaftServer.newBuilder()
@@ -88,8 +98,10 @@ public final class CounterServer implements Closeable {
if (peerIndex < 0 || peerIndex > 2) {
throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex);
}
-
- startServer(peerIndex);
+ TimeDuration simulatedSlowness = Optional.ofNullable(Constants.SIMULATED_SLOWNESS)
+ .map(slownessList -> slownessList.get(peerIndex))
+ .orElse(TimeDuration.ZERO);
+ startServer(peerIndex, simulatedSlowness);
} catch(Throwable e) {
e.printStackTrace();
System.err.println();
@@ -102,13 +114,13 @@ public final class CounterServer implements Closeable {
}
}
- private static void startServer(int peerIndex) throws IOException {
+ private static void startServer(int peerIndex, TimeDuration simulatedSlowness) throws IOException {
//get peer and define storage dir
final RaftPeer currentPeer = Constants.PEERS.get(peerIndex);
final File storageDir = new File("./" + currentPeer.getId());
//start a counter server
- try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) {
+ try(CounterServer counterServer = new CounterServer(currentPeer, storageDir, simulatedSlowness)) {
counterServer.start();
//exit when any input entered
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index f383d3a83..299b67325 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.TimeDuration;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -79,6 +80,15 @@ public class CounterStateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
private final AtomicInteger counter = new AtomicInteger(0);
+ private final TimeDuration simulatedSlowness;
+
+ CounterStateMachine(TimeDuration simulatedSlowness) {
+ this.simulatedSlowness = simulatedSlowness;
+ }
+ CounterStateMachine() {
+ this.simulatedSlowness = TimeDuration.ZERO;
+ }
+
/** @return the current state. */
private synchronized CounterState getState() {
return new CounterState(getLastAppliedTermIndex(), counter.get());
@@ -90,6 +100,14 @@ public class CounterStateMachine extends BaseStateMachine {
}
private synchronized int incrementCounter(TermIndex termIndex) {
+ try {
+ if (!simulatedSlowness.equals(TimeDuration.ZERO)) {
+ simulatedSlowness.sleep();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{}: get interrupted in simulated slowness sleep before apply transaction", this);
+ Thread.currentThread().interrupt();
+ }
updateLastAppliedTermIndex(termIndex);
return counter.incrementAndGet();
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
index a2ab8e172..4377a1420 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/debug/server/Server.java
@@ -21,6 +21,7 @@ package org.apache.ratis.examples.debug.server;
import org.apache.ratis.examples.common.Constants;
import org.apache.ratis.examples.counter.server.CounterServer;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
import java.io.File;
import java.io.IOException;
@@ -45,7 +46,7 @@ public final class Server {
.findFirst().orElseThrow(() -> new IllegalArgumentException("Peer not found: " + args[0]));
final File storageDir = new File(Constants.PATH, currentPeer.getId().toString());
- final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
+ final CounterServer counterServer = new CounterServer(currentPeer, storageDir, TimeDuration.ZERO);
counterServer.start();
}
}
diff --git a/ratis-examples/src/main/resources/conf.properties b/ratis-examples/src/main/resources/conf.properties
index 839090101..8e0a901cd 100644
--- a/ratis-examples/src/main/resources/conf.properties
+++ b/ratis-examples/src/main/resources/conf.properties
@@ -15,4 +15,6 @@
# limitations under the License.
raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
-# raft.server.root.storage.path
\ No newline at end of file
+# raft.server.root.storage.path
+# raft.server.priority.list=1,0,0
+# raft.server.simulated-slowness.list=0,1s,0
\ No newline at end of file