You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/08 16:47:30 UTC

[ignite-3] branch ignite-13885 updated: IGNITE-13885 Message recording.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-13885 by this push:
     new e343b42  IGNITE-13885 Message recording.
e343b42 is described below

commit e343b42b97f28fb9fa624fea966c3b3f089b8c2f
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Feb 8 19:47:20 2021 +0300

    IGNITE-13885 Message recording.
---
 .../java/com/alipay/sofa/jraft/rpc/RpcClient.java  |   1 +
 .../sofa/jraft/rpc/impl/LocalConnection.java       |  66 ++++++++-
 .../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java |  45 ++++--
 .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java |   5 +-
 .../java/com/alipay/sofa/jraft/util/Utils.java     |   8 +
 .../java/com/alipay/sofa/jraft/core/NodeTest.java  |  36 +++++
 .../com/alipay/sofa/jraft/rpc/LocalRpcTest.java    | 161 ++++++++++++++++++++-
 7 files changed, 299 insertions(+), 23 deletions(-)

diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
index f6ae218..33b0ccf 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java
@@ -41,6 +41,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param endpoint       target address
      * @param createIfAbsent create a new one if there is no connection
      * @return true if there is a connection and the connection is active and writable.
+     * TODO asch it probably should return com.alipay.sofa.jraft.rpc.Connection.
      */
     boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
 
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index 0607196..d1ea408 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -1,22 +1,72 @@
 package com.alipay.sofa.jraft.rpc.impl;
 
 import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.rpc.Message;
 import com.alipay.sofa.jraft.util.Endpoint;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
 
 public class LocalConnection implements Connection {
+    private static boolean RECORD_ALL_MESSAGES = false;
+
     private Map<String, Object> attrs = new ConcurrentHashMap<>();
 
-    final LocalRpcClient client;
-    final Endpoint srv;
+    public final LocalRpcClient client;
+    public final LocalRpcServer srv;
+
+    private volatile Predicate<Message> recordPred;
+    private volatile Predicate<Message> blockPred;
+
+    private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>();
+    private LinkedBlockingQueue<Message> recordedMsgs = new LinkedBlockingQueue<>();
 
-    public LocalConnection(LocalRpcClient client, Endpoint srv) {
+    public LocalConnection(LocalRpcClient client, LocalRpcServer srv) {
         this.client = client;
         this.srv = srv;
     }
 
+    public void recordMessages(Predicate<Message> pred) {
+        this.recordPred = pred;
+    }
+
+    public void blockMessages(Predicate<Message> pred) {
+        this.blockPred = pred;
+    }
+
+    private void send(Message request, Future fut) {
+        Object[] tuple = {client, request, fut};
+        assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+    }
+
+    public void onBeforeRequestSend(Message request, Future fut) {
+        if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(request))
+            recordedMsgs.add(request);
+
+        if (blockPred != null && blockPred.test(request)) {
+            blockedMsgs.add(new Object[]{request, fut});
+
+            return;
+        }
+
+        send(request, fut);
+    }
+
+    public void sendBlocked() {
+        blockedMsgs.drainTo(srv.incoming);
+    }
+
+    public void onAfterResponseSend(Message msg, Throwable err) {
+        assert err == null : err;
+
+        if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
+            recordedMsgs.add(msg);
+    }
+
     @Override public Object getAttribute(String key) {
         return attrs.get(key);
     }
@@ -30,6 +80,14 @@ public class LocalConnection implements Connection {
     }
 
     @Override public void close() {
-        LocalRpcServer.closeConnection(client, srv);
+        LocalRpcServer.closeConnection(client, srv.local);
+    }
+
+    public Queue<Message> recordedMessages() {
+        return recordedMsgs;
+    }
+
+    @Override public String toString() {
+        return client.toString() + " -> " + srv.local.toString();
     }
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
index 474e400..d00a5f7 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
@@ -21,18 +21,19 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.error.InvokeTimeoutException;
 import com.alipay.sofa.jraft.error.RemotingException;
 import com.alipay.sofa.jraft.option.RpcOptions;
-import com.alipay.sofa.jraft.rpc.Connection;
 import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.alipay.sofa.jraft.rpc.InvokeContext;
+import com.alipay.sofa.jraft.rpc.Message;
 import com.alipay.sofa.jraft.rpc.RpcClient;
 import com.alipay.sofa.jraft.rpc.RpcUtils;
 import com.alipay.sofa.jraft.util.Endpoint;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Local rpc client impl.
@@ -40,7 +41,11 @@ import java.util.function.BiConsumer;
  * @author ascherbakov.
  */
 public class LocalRpcClient implements RpcClient {
-    private volatile ReplicatorGroup replicatorGroup = null;
+    private static final Logger LOG                    = LoggerFactory.getLogger(LocalRpcClient.class);
+
+    public volatile ReplicatorGroup replicatorGroup = null;
+
+    public static Consumer<LocalConnection> onCreated = null;
 
     @Override public boolean checkConnection(Endpoint endpoint) {
         return LocalRpcServer.connect(this, endpoint, false, null);
@@ -61,12 +66,15 @@ public class LocalRpcClient implements RpcClient {
     private void onCreated(LocalConnection conn) {
         if (replicatorGroup != null) {
             final PeerId peer = new PeerId();
-            if (peer.parse(conn.srv.toString())) {
+            if (peer.parse(conn.srv.local.toString())) {
                 RpcUtils.runInThread(() -> replicatorGroup.checkReplicator(peer, true)); // Avoid deadlock.
             }
             else
-                System.out.println("Fail to parse peer: {}" + peer); // TODO asch
+                LOG.warn("Failed to parse peer: {}", peer); // TODO asch how to handle ?
         }
+
+        if (onCreated != null)
+            onCreated.accept(conn);
     }
 
     @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
@@ -77,13 +85,16 @@ public class LocalRpcClient implements RpcClient {
         if (srv == null)
             throw new RemotingException("Server is dead " + endpoint);
 
-        CompletableFuture fut = new CompletableFuture();
+        LocalConnection locConn = srv.conns.get(this);
+        if (locConn == null)
+            throw new RemotingException("Server is dead " + endpoint);
 
-        Object[] tuple = {this, request, fut};
-        assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+        CompletableFuture<Object> fut = new CompletableFuture();
+
+        locConn.onBeforeRequestSend((Message) request, fut);
 
         try {
-            return fut.get(timeoutMs, TimeUnit.MILLISECONDS);
+            return fut.whenComplete((res, err) -> locConn.onAfterResponseSend((Message) res, err)).get(timeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw new RemotingException(e);
         } catch (TimeoutException e) {
@@ -99,18 +110,22 @@ public class LocalRpcClient implements RpcClient {
         if (srv == null)
             throw new RemotingException("Server is dead " + endpoint);
 
-        CompletableFuture fut = new CompletableFuture();
+        LocalConnection locConn = srv.conns.get(this);
+        if (locConn == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        CompletableFuture<Object> fut = new CompletableFuture<>();
 
-        Object[] tuple = {this, request, fut};
-        assert srv.incoming.offer(tuple);
+        locConn.onBeforeRequestSend((Message) request, fut);
 
-        fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> {
+        fut.whenComplete((res, err) -> {
+            locConn.onAfterResponseSend((Message) res, err);
             RpcUtils.runInThread(() -> callback.complete(res, err)); // Avoid deadlocks if a closure has completed in the same thread.
         }).orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
     }
 
     @Override public boolean init(RpcOptions opts) {
-        return false;
+        return true;
     }
 
     @Override public void shutdown() {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index a24427d..db9648a 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -65,6 +65,7 @@ public class LocalRpcServer implements RpcServer {
 
     BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
 
+    // TODO FIXME asch Or better use com.alipay.sofa.jraft.rpc.RpcUtils.RPC_CLOSURE_EXECUTOR ?
     private ExecutorService defaultExecutor;
 
     public LocalRpcServer(Endpoint local) {
@@ -75,7 +76,7 @@ public class LocalRpcServer implements RpcServer {
         LocalRpcServer locSrv = servers.get(srv);
 
         if (locSrv == null)
-            return false; // Server is dead.
+            return false; // Server is not ready.
 
         LocalConnection conn = locSrv.conns.get(client);
 
@@ -83,7 +84,7 @@ public class LocalRpcServer implements RpcServer {
             if (!createIfAbsent)
                 return false;
 
-            conn = new LocalConnection(client, srv);
+            conn = new LocalConnection(client, locSrv);
 
             LocalConnection oldConn = locSrv.conns.putIfAbsent(client, conn);
 
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
index 586ec0a..4566076 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
@@ -30,6 +30,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -161,6 +162,13 @@ public final class Utils {
     }
 
     /**
+     * Run a callable in thread pool,returns the future object.
+     */
+    public static <V> Future<V> runInThread(final Callable<V> runnable) {
+        return CLOSURE_EXECUTOR.submit(runnable);
+    }
+
+    /**
      * Run closure with status in thread pool.
      */
     @SuppressWarnings("Convert2Lambda")
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
index 2aa3a84..dacbd8c 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
@@ -16,6 +16,10 @@
  */
 package com.alipay.sofa.jraft.core;
 
+import com.alipay.sofa.jraft.rpc.impl.LocalConnection;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
+import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
+import com.alipay.sofa.jraft.util.concurrent.ConcurrentHashSet;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -33,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import java.util.stream.Collectors;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -3389,6 +3394,37 @@ public class NodeTest {
         }
     }
 
+    @Test
+    public void testClusterWithMessageRecording() throws Exception {
+        Set<LocalConnection> conns = new ConcurrentHashSet<>();
+
+        LocalRpcClient.onCreated = conn -> {
+            conns.add(conn);
+            conn.recordMessages(msg -> true);
+        };
+
+        final List<PeerId> peers = TestUtils.generatePeers(3);
+        final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+
+        for (final PeerId peer : peers) {
+            assertTrue(cluster.start(peer.getEndpoint()));
+        }
+
+        cluster.waitLeader();
+
+        Thread.sleep(2_000);
+
+        NodeImpl leader = (NodeImpl) cluster.getLeader();
+        DefaultRaftClientService rpcService = (DefaultRaftClientService) leader.getRpcService();
+        LocalRpcClient localRpcClient = (LocalRpcClient) rpcService.getRpcClient();
+
+        List<LocalConnection> leaderConns = conns.stream().filter(loc -> loc.client == localRpcClient).collect(Collectors.toList());
+
+        assertEquals(2, leaderConns.size());
+
+        cluster.stopAll();
+    }
+
     private NodeOptions createNodeOptionsWithSharedTimer() {
         final NodeOptions options = new NodeOptions();
         options.setSharedElectionTimer(true);
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
index f34ebed..f748e92 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -2,20 +2,25 @@ package com.alipay.sofa.jraft.rpc;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.rpc.impl.LocalConnection;
 import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
 import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer;
 import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * TODO add test for localconn.close, timeouts.
@@ -120,9 +125,155 @@ public class LocalRpcTest {
         assertFalse(client2.checkConnection(endpoint));
     }
 
+    @Test
+    public void testRecordedSync() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        LocalRpcClient.onCreated = conn -> {
+            connRef.set(conn);
+            conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+        };
+
+        RpcClient client1 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
+        Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+
+        Queue<Message> recorded = connRef.get().recordedMessages();
+
+        assertEquals(2, recorded.size());
+        assertTrue(recorded.poll() instanceof Request1);
+        assertTrue(recorded.poll() instanceof Response1);
+    }
+
+    @Test
+    public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        LocalRpcClient.onCreated = conn -> {
+            connRef.set(conn);
+            conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+        };
+
+        RpcClient client1 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        try {
+            Request1 request = new Request1();
+            request.val = 10_000;
+            Response1 resp1 = (Response1) client1.invokeSync(endpoint, request, 500);
+
+            fail();
+        } catch (Exception e) {
+            // Expected.
+        }
+
+        Queue<Message> recorded = connRef.get().recordedMessages();
+
+        assertEquals(1, recorded.size());
+        assertTrue(recorded.poll() instanceof Request1);
+    }
+
+    @Test
+    public void testRecordedAsync() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        LocalRpcClient.onCreated = conn -> {
+            connRef.set(conn);
+            conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+        };
+
+        RpcClient client1 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        CountDownLatch l = new CountDownLatch(2);
+
+        client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+        client1.invokeAsync(endpoint, new Request2(), null, (result, err) -> l.countDown(), 500);
+
+        l.await();
+
+        Queue<Message> recorded = connRef.get().recordedMessages();
+
+        assertEquals(2, recorded.size());
+        assertTrue(recorded.poll() instanceof Request1);
+        assertTrue(recorded.poll() instanceof Response1);
+    }
+
+    @Test
+    @Ignore
+    public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        LocalRpcClient.onCreated = conn -> {
+            connRef.set(conn);
+            conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+        };
+
+        RpcClient client1 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        try {
+            Request1 request = new Request1();
+            request.val = 10_000;
+            CountDownLatch l = new CountDownLatch(1);
+
+            // TODO asch invokeasync with timeout not working.
+            client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+
+            l.await();
+
+            fail();
+        } catch (Exception e) {
+            // Expected.
+        }
+
+        Queue<Message> recorded = connRef.get().recordedMessages();
+
+        assertEquals(1, recorded.size());
+        assertTrue(recorded.poll() instanceof Request1);
+    }
+
+    @Test
+    public void testBlockedSync() throws RemotingException, InterruptedException {
+//        RpcClient client1 = new LocalRpcClient();
+//
+//        assertTrue(client1.checkConnection(endpoint, true));
+//
+//        LocalConnection conn = LocalRpcServer.servers.get(endpoint).conns.get(client1);
+//
+//        assertNotNull(conn);
+//
+//        conn.recordMessages(msg -> msg instanceof Request1);
+//
+//        Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+//
+//        assertEquals(1, resp2.val);
+//
+//        Future<Response1> resp = Utils.runInThread(() -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
+//
+//        Thread.sleep(3_000);
+//
+//        assertFalse(resp.isDone());
+    }
+
     private static class Request1RpcProcessor implements RpcProcessor<Request1> {
         @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
-            rpcCtx.sendResponse(new Response1());
+            if (request.val == 10_000)
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignored) {
+                    // No-op.
+                }
+
+            Response1 resp1 = new Response1();
+            resp1.val = request.val + 1;
+            rpcCtx.sendResponse(resp1);
         }
 
         @Override public String interest() {
@@ -132,7 +283,9 @@ public class LocalRpcTest {
 
     private static class Request2RpcProcessor implements RpcProcessor<Request2> {
         @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
-            rpcCtx.sendResponse(new Response2());
+            Response2 resp2 = new Response2();
+            resp2.val = request.val + 1;
+            rpcCtx.sendResponse(resp2);
         }
 
         @Override public String interest() {
@@ -141,14 +294,18 @@ public class LocalRpcTest {
     }
 
     private static class Request1 implements Message {
+        int val;
     }
 
     private static class Request2 implements Message {
+        int val;
     }
 
     private static class Response1 implements Message {
+        int val;
     }
 
     private static class Response2 implements Message {
+        int val;
     }
 }