You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/09 16:26:13 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Timestamps for
recorded.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new c9b34ca IGNITE-13885 Timestamps for recorded.
c9b34ca is described below
commit c9b34ca77e77f0468e5a134ffd2c722dbbc5bad8
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Feb 9 19:25:59 2021 +0300
IGNITE-13885 Timestamps for recorded.
---
.../alipay/sofa/jraft/rpc/impl/LocalConnection.java | 9 +++++----
.../java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 20 ++++++++++----------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index d1ea408..db63614 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -2,6 +2,7 @@ package com.alipay.sofa.jraft.rpc.impl;
import com.alipay.sofa.jraft.rpc.Connection;
import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.util.Endpoint;
import java.util.Collection;
import java.util.Map;
@@ -23,7 +24,7 @@ public class LocalConnection implements Connection {
private volatile Predicate<Message> blockPred;
private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>();
- private LinkedBlockingQueue<Message> recordedMsgs = new LinkedBlockingQueue<>();
+ private LinkedBlockingQueue<Object[]> recordedMsgs = new LinkedBlockingQueue<>();
public LocalConnection(LocalRpcClient client, LocalRpcServer srv) {
this.client = client;
@@ -45,7 +46,7 @@ public class LocalConnection implements Connection {
public void onBeforeRequestSend(Message request, Future fut) {
if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(request))
- recordedMsgs.add(request);
+ recordedMsgs.add(new Object[]{System.currentTimeMillis(), request});
if (blockPred != null && blockPred.test(request)) {
blockedMsgs.add(new Object[]{request, fut});
@@ -64,7 +65,7 @@ public class LocalConnection implements Connection {
assert err == null : err;
if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
- recordedMsgs.add(msg);
+ recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg});
}
@Override public Object getAttribute(String key) {
@@ -83,7 +84,7 @@ public class LocalConnection implements Connection {
LocalRpcServer.closeConnection(client, srv.local);
}
- public Queue<Message> recordedMessages() {
+ public Queue<Object[]> recordedMessages() {
return recordedMsgs;
}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
index f748e92..0a9b08a 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -141,11 +141,11 @@ public class LocalRpcTest {
Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
- Queue<Message> recorded = connRef.get().recordedMessages();
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
assertEquals(2, recorded.size());
- assertTrue(recorded.poll() instanceof Request1);
- assertTrue(recorded.poll() instanceof Response1);
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Response1);
}
@Test
@@ -171,10 +171,10 @@ public class LocalRpcTest {
// Expected.
}
- Queue<Message> recorded = connRef.get().recordedMessages();
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
assertEquals(1, recorded.size());
- assertTrue(recorded.poll() instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Request1);
}
@Test
@@ -197,11 +197,11 @@ public class LocalRpcTest {
l.await();
- Queue<Message> recorded = connRef.get().recordedMessages();
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
assertEquals(2, recorded.size());
- assertTrue(recorded.poll() instanceof Request1);
- assertTrue(recorded.poll() instanceof Response1);
+ assertTrue(recorded.poll()[1] instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Response1);
}
@Test
@@ -233,10 +233,10 @@ public class LocalRpcTest {
// Expected.
}
- Queue<Message> recorded = connRef.get().recordedMessages();
+ Queue<Object[]> recorded = connRef.get().recordedMessages();
assertEquals(1, recorded.size());
- assertTrue(recorded.poll() instanceof Request1);
+ assertTrue(recorded.poll()[1] instanceof Request1);
}
@Test