You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/09 16:26:13 UTC

[ignite-3] branch ignite-13885 updated: IGNITE-13885 Timestamps for recorded.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-13885 by this push:
     new c9b34ca  IGNITE-13885 Timestamps for recorded.
c9b34ca is described below

commit c9b34ca77e77f0468e5a134ffd2c722dbbc5bad8
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Feb 9 19:25:59 2021 +0300

    IGNITE-13885 Timestamps for recorded.
---
 .../alipay/sofa/jraft/rpc/impl/LocalConnection.java  |  9 +++++----
 .../java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 20 ++++++++++----------
 2 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index d1ea408..db63614 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -2,6 +2,7 @@ package com.alipay.sofa.jraft.rpc.impl;
 
 import com.alipay.sofa.jraft.rpc.Connection;
 import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
 import com.alipay.sofa.jraft.util.Endpoint;
 import java.util.Collection;
 import java.util.Map;
@@ -23,7 +24,7 @@ public class LocalConnection implements Connection {
     private volatile Predicate<Message> blockPred;
 
     private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>();
-    private LinkedBlockingQueue<Message> recordedMsgs = new LinkedBlockingQueue<>();
+    private LinkedBlockingQueue<Object[]> recordedMsgs = new LinkedBlockingQueue<>();
 
     public LocalConnection(LocalRpcClient client, LocalRpcServer srv) {
         this.client = client;
@@ -45,7 +46,7 @@ public class LocalConnection implements Connection {
 
     public void onBeforeRequestSend(Message request, Future fut) {
         if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(request))
-            recordedMsgs.add(request);
+            recordedMsgs.add(new Object[]{System.currentTimeMillis(), request});
 
         if (blockPred != null && blockPred.test(request)) {
             blockedMsgs.add(new Object[]{request, fut});
@@ -64,7 +65,7 @@ public class LocalConnection implements Connection {
         assert err == null : err;
 
         if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
-            recordedMsgs.add(msg);
+            recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg});
     }
 
     @Override public Object getAttribute(String key) {
@@ -83,7 +84,7 @@ public class LocalConnection implements Connection {
         LocalRpcServer.closeConnection(client, srv.local);
     }
 
-    public Queue<Message> recordedMessages() {
+    public Queue<Object[]> recordedMessages() {
         return recordedMsgs;
     }
 
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
index f748e92..0a9b08a 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -141,11 +141,11 @@ public class LocalRpcTest {
         Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
         Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
 
-        Queue<Message> recorded = connRef.get().recordedMessages();
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
 
         assertEquals(2, recorded.size());
-        assertTrue(recorded.poll() instanceof Request1);
-        assertTrue(recorded.poll() instanceof Response1);
+        assertTrue(recorded.poll()[1] instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Response1);
     }
 
     @Test
@@ -171,10 +171,10 @@ public class LocalRpcTest {
             // Expected.
         }
 
-        Queue<Message> recorded = connRef.get().recordedMessages();
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
 
         assertEquals(1, recorded.size());
-        assertTrue(recorded.poll() instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Request1);
     }
 
     @Test
@@ -197,11 +197,11 @@ public class LocalRpcTest {
 
         l.await();
 
-        Queue<Message> recorded = connRef.get().recordedMessages();
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
 
         assertEquals(2, recorded.size());
-        assertTrue(recorded.poll() instanceof Request1);
-        assertTrue(recorded.poll() instanceof Response1);
+        assertTrue(recorded.poll()[1] instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Response1);
     }
 
     @Test
@@ -233,10 +233,10 @@ public class LocalRpcTest {
             // Expected.
         }
 
-        Queue<Message> recorded = connRef.get().recordedMessages();
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
 
         assertEquals(1, recorded.size());
-        assertTrue(recorded.poll() instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Request1);
     }
 
     @Test