You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/08 16:47:30 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Message
recording.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new e343b42 IGNITE-13885 Message recording.
e343b42 is described below
commit e343b42b97f28fb9fa624fea966c3b3f089b8c2f
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Feb 8 19:47:20 2021 +0300
IGNITE-13885 Message recording.
---
.../java/com/alipay/sofa/jraft/rpc/RpcClient.java | 1 +
.../sofa/jraft/rpc/impl/LocalConnection.java | 66 ++++++++-
.../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java | 45 ++++--
.../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 5 +-
.../java/com/alipay/sofa/jraft/util/Utils.java | 8 +
.../java/com/alipay/sofa/jraft/core/NodeTest.java | 36 +++++
.../com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 161 ++++++++++++++++++++-
7 files changed, 299 insertions(+), 23 deletions(-)
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
index f6ae218..33b0ccf 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
@@ -41,6 +41,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param endpoint target address
* @param createIfAbsent create a new one if there is no connection
* @return true if there is a connection and the connection is active and writable.
+ * TODO asch it probably should return com.alipay.sofa.jraft.rpc.Connection.
*/
boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index 0607196..d1ea408 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -1,22 +1,72 @@
package com.alipay.sofa.jraft.rpc.impl;
import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.rpc.Message;
import com.alipay.sofa.jraft.util.Endpoint;
import java.util.Collection;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
public class LocalConnection implements Connection {
+ private static boolean RECORD_ALL_MESSAGES = false;
+
private Map<String, Object> attrs = new ConcurrentHashMap<>();
- final LocalRpcClient client;
- final Endpoint srv;
+ public final LocalRpcClient client;
+ public final LocalRpcServer srv;
+
+ private volatile Predicate<Message> recordPred;
+ private volatile Predicate<Message> blockPred;
+
+ private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>();
+ private LinkedBlockingQueue<Message> recordedMsgs = new LinkedBlockingQueue<>();
- public LocalConnection(LocalRpcClient client, Endpoint srv) {
+ public LocalConnection(LocalRpcClient client, LocalRpcServer srv) {
this.client = client;
this.srv = srv;
}
+ public void recordMessages(Predicate<Message> pred) {
+ this.recordPred = pred;
+ }
+
+ public void blockMessages(Predicate<Message> pred) {
+ this.blockPred = pred;
+ }
+
+ private void send(Message request, Future fut) {
+ Object[] tuple = {client, request, fut};
+ assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+ }
+
+ public void onBeforeRequestSend(Message request, Future fut) {
+ if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(request))
+ recordedMsgs.add(request);
+
+ if (blockPred != null && blockPred.test(request)) {
+ blockedMsgs.add(new Object[]{request, fut});
+
+ return;
+ }
+
+ send(request, fut);
+ }
+
+ public void sendBlocked() {
+ blockedMsgs.drainTo(srv.incoming);
+ }
+
+ public void onAfterResponseSend(Message msg, Throwable err) {
+ assert err == null : err;
+
+ if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
+ recordedMsgs.add(msg);
+ }
+
@Override public Object getAttribute(String key) {
return attrs.get(key);
}
@@ -30,6 +80,14 @@ public class LocalConnection implements Connection {
}
@Override public void close() {
- LocalRpcServer.closeConnection(client, srv);
+ LocalRpcServer.closeConnection(client, srv.local);
+ }
+
+ public Queue<Message> recordedMessages() {
+ return recordedMsgs;
+ }
+
+ @Override public String toString() {
+ return client.toString() + " -> " + srv.local.toString();
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
index 474e400..d00a5f7 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
@@ -21,18 +21,19 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.InvokeTimeoutException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.RpcOptions;
-import com.alipay.sofa.jraft.rpc.Connection;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
+import com.alipay.sofa.jraft.rpc.Message;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.util.Endpoint;
-import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Local rpc client impl.
@@ -40,7 +41,11 @@ import java.util.function.BiConsumer;
* @author ascherbakov.
*/
public class LocalRpcClient implements RpcClient {
- private volatile ReplicatorGroup replicatorGroup = null;
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRpcClient.class);
+
+ public volatile ReplicatorGroup replicatorGroup = null;
+
+ public static Consumer<LocalConnection> onCreated = null;
@Override public boolean checkConnection(Endpoint endpoint) {
return LocalRpcServer.connect(this, endpoint, false, null);
@@ -61,12 +66,15 @@ public class LocalRpcClient implements RpcClient {
private void onCreated(LocalConnection conn) {
if (replicatorGroup != null) {
final PeerId peer = new PeerId();
- if (peer.parse(conn.srv.toString())) {
+ if (peer.parse(conn.srv.local.toString())) {
RpcUtils.runInThread(() -> replicatorGroup.checkReplicator(peer, true)); // Avoid deadlock.
}
else
- System.out.println("Fail to parse peer: {}" + peer); // TODO asch
+ LOG.warn("Failed to parse peer: {}", peer); // TODO asch how to handle ?
}
+
+ if (onCreated != null)
+ onCreated.accept(conn);
}
@Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
@@ -77,13 +85,16 @@ public class LocalRpcClient implements RpcClient {
if (srv == null)
throw new RemotingException("Server is dead " + endpoint);
- CompletableFuture fut = new CompletableFuture();
+ LocalConnection locConn = srv.conns.get(this);
+ if (locConn == null)
+ throw new RemotingException("Server is dead " + endpoint);
- Object[] tuple = {this, request, fut};
- assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+ CompletableFuture<Object> fut = new CompletableFuture();
+
+ locConn.onBeforeRequestSend((Message) request, fut);
try {
- return fut.get(timeoutMs, TimeUnit.MILLISECONDS);
+ return fut.whenComplete((res, err) -> locConn.onAfterResponseSend((Message) res, err)).get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RemotingException(e);
} catch (TimeoutException e) {
@@ -99,18 +110,22 @@ public class LocalRpcClient implements RpcClient {
if (srv == null)
throw new RemotingException("Server is dead " + endpoint);
- CompletableFuture fut = new CompletableFuture();
+ LocalConnection locConn = srv.conns.get(this);
+ if (locConn == null)
+ throw new RemotingException("Server is dead " + endpoint);
+
+ CompletableFuture<Object> fut = new CompletableFuture<>();
- Object[] tuple = {this, request, fut};
- assert srv.incoming.offer(tuple);
+ locConn.onBeforeRequestSend((Message) request, fut);
- fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> {
+ fut.whenComplete((res, err) -> {
+ locConn.onAfterResponseSend((Message) res, err);
RpcUtils.runInThread(() -> callback.complete(res, err)); // Avoid deadlocks if a closure has completed in the same thread.
}).orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
}
@Override public boolean init(RpcOptions opts) {
- return false;
+ return true;
}
@Override public void shutdown() {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index a24427d..db9648a 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -65,6 +65,7 @@ public class LocalRpcServer implements RpcServer {
BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
+ // TODO FIXME asch Or better use com.alipay.sofa.jraft.rpc.RpcUtils.RPC_CLOSURE_EXECUTOR ?
private ExecutorService defaultExecutor;
public LocalRpcServer(Endpoint local) {
@@ -75,7 +76,7 @@ public class LocalRpcServer implements RpcServer {
LocalRpcServer locSrv = servers.get(srv);
if (locSrv == null)
- return false; // Server is dead.
+ return false; // Server is not ready.
LocalConnection conn = locSrv.conns.get(client);
@@ -83,7 +84,7 @@ public class LocalRpcServer implements RpcServer {
if (!createIfAbsent)
return false;
- conn = new LocalConnection(client, srv);
+ conn = new LocalConnection(client, locSrv);
LocalConnection oldConn = locSrv.conns.putIfAbsent(client, conn);
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
index 586ec0a..4566076 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
@@ -30,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
+import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -161,6 +162,13 @@ public final class Utils {
}
/**
+ * Run a callable in thread pool,returns the future object.
+ */
+ public static <V> Future<V> runInThread(final Callable<V> runnable) {
+ return CLOSURE_EXECUTOR.submit(runnable);
+ }
+
+ /**
* Run closure with status in thread pool.
*/
@SuppressWarnings("Convert2Lambda")
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
index 2aa3a84..dacbd8c 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
@@ -16,6 +16,10 @@
*/
package com.alipay.sofa.jraft.core;
+import com.alipay.sofa.jraft.rpc.impl.LocalConnection;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
+import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
+import com.alipay.sofa.jraft.util.concurrent.ConcurrentHashSet;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -33,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -3389,6 +3394,37 @@ public class NodeTest {
}
}
+ @Test
+ public void testClusterWithMessageRecording() throws Exception {
+ Set<LocalConnection> conns = new ConcurrentHashSet<>();
+
+ LocalRpcClient.onCreated = conn -> {
+ conns.add(conn);
+ conn.recordMessages(msg -> true);
+ };
+
+ final List<PeerId> peers = TestUtils.generatePeers(3);
+ final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+
+ for (final PeerId peer : peers) {
+ assertTrue(cluster.start(peer.getEndpoint()));
+ }
+
+ cluster.waitLeader();
+
+ Thread.sleep(2_000);
+
+ NodeImpl leader = (NodeImpl) cluster.getLeader();
+ DefaultRaftClientService rpcService = (DefaultRaftClientService) leader.getRpcService();
+ LocalRpcClient localRpcClient = (LocalRpcClient) rpcService.getRpcClient();
+
+ List<LocalConnection> leaderConns = conns.stream().filter(loc -> loc.client == localRpcClient).collect(Collectors.toList());
+
+ assertEquals(2, leaderConns.size());
+
+ cluster.stopAll();
+ }
+
private NodeOptions createNodeOptionsWithSharedTimer() {
final NodeOptions options = new NodeOptions();
options.setSharedElectionTimer(true);
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
index f34ebed..f748e92 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -2,20 +2,25 @@ package com.alipay.sofa.jraft.rpc;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.rpc.impl.LocalConnection;
import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* TODO add test for localconn.close, timeouts.
@@ -120,9 +125,155 @@ public class LocalRpcTest {
assertFalse(client2.checkConnection(endpoint));
}
+ @Test
+ public void testRecordedSync() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ LocalRpcClient.onCreated = conn -> {
+ connRef.set(conn);
+ conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ };
+
+ RpcClient client1 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
+ Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+
+ Queue<Message> recorded = connRef.get().recordedMessages();
+
+ assertEquals(2, recorded.size());
+ assertTrue(recorded.poll() instanceof Request1);
+ assertTrue(recorded.poll() instanceof Response1);
+ }
+
+ @Test
+ public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ LocalRpcClient.onCreated = conn -> {
+ connRef.set(conn);
+ conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ };
+
+ RpcClient client1 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ try {
+ Request1 request = new Request1();
+ request.val = 10_000;
+ Response1 resp1 = (Response1) client1.invokeSync(endpoint, request, 500);
+
+ fail();
+ } catch (Exception e) {
+ // Expected.
+ }
+
+ Queue<Message> recorded = connRef.get().recordedMessages();
+
+ assertEquals(1, recorded.size());
+ assertTrue(recorded.poll() instanceof Request1);
+ }
+
+ @Test
+ public void testRecordedAsync() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ LocalRpcClient.onCreated = conn -> {
+ connRef.set(conn);
+ conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ };
+
+ RpcClient client1 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ CountDownLatch l = new CountDownLatch(2);
+
+ client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+ client1.invokeAsync(endpoint, new Request2(), null, (result, err) -> l.countDown(), 500);
+
+ l.await();
+
+ Queue<Message> recorded = connRef.get().recordedMessages();
+
+ assertEquals(2, recorded.size());
+ assertTrue(recorded.poll() instanceof Request1);
+ assertTrue(recorded.poll() instanceof Response1);
+ }
+
+ @Test
+ @Ignore
+ public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+ AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+ LocalRpcClient.onCreated = conn -> {
+ connRef.set(conn);
+ conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+ };
+
+ RpcClient client1 = new LocalRpcClient();
+
+ assertTrue(client1.checkConnection(endpoint, true));
+
+ try {
+ Request1 request = new Request1();
+ request.val = 10_000;
+ CountDownLatch l = new CountDownLatch(1);
+
+ // TODO asch invokeasync with timeout not working.
+ client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+
+ l.await();
+
+ fail();
+ } catch (Exception e) {
+ // Expected.
+ }
+
+ Queue<Message> recorded = connRef.get().recordedMessages();
+
+ assertEquals(1, recorded.size());
+ assertTrue(recorded.poll() instanceof Request1);
+ }
+
+ @Test
+ public void testBlockedSync() throws RemotingException, InterruptedException {
+// RpcClient client1 = new LocalRpcClient();
+//
+// assertTrue(client1.checkConnection(endpoint, true));
+//
+// LocalConnection conn = LocalRpcServer.servers.get(endpoint).conns.get(client1);
+//
+// assertNotNull(conn);
+//
+// conn.recordMessages(msg -> msg instanceof Request1);
+//
+// Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+//
+// assertEquals(1, resp2.val);
+//
+// Future<Response1> resp = Utils.runInThread(() -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
+//
+// Thread.sleep(3_000);
+//
+// assertFalse(resp.isDone());
+ }
+
private static class Request1RpcProcessor implements RpcProcessor<Request1> {
@Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
- rpcCtx.sendResponse(new Response1());
+ if (request.val == 10_000)
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ Response1 resp1 = new Response1();
+ resp1.val = request.val + 1;
+ rpcCtx.sendResponse(resp1);
}
@Override public String interest() {
@@ -132,7 +283,9 @@ public class LocalRpcTest {
private static class Request2RpcProcessor implements RpcProcessor<Request2> {
@Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
- rpcCtx.sendResponse(new Response2());
+ Response2 resp2 = new Response2();
+ resp2.val = request.val + 1;
+ rpcCtx.sendResponse(resp2);
}
@Override public String interest() {
@@ -141,14 +294,18 @@ public class LocalRpcTest {
}
private static class Request1 implements Message {
+ int val;
}
private static class Request2 implements Message {
+ int val;
}
private static class Response1 implements Message {
+ int val;
}
private static class Response2 implements Message {
+ int val;
}
}