You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/11/26 09:02:33 UTC
[incubator-tuweni] branch master updated: Add listeners for
connection and disconnection
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 3b808d6 Add listeners for connection and disconnection
new d71c5e6 Merge pull request #175 from atoulme/add_connection_disconnection_listeners
3b808d6 is described below
commit 3b808d62883b360e8b04b3f7aa2a178b1ba6d644
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Nov 25 23:06:52 2020 -0800
Add listeners for connection and disconnection
---
.../org/apache/tuweni/devp2p/EthereumNodeRecord.kt | 1 +
.../WireConnectionPeerRepositoryAdapter.kt | 22 ++++++++++
.../tuweni/rlpx/vertx/VertxRLPxServiceTest.java | 49 ++++++++++++++++++++--
.../rlpx/MemoryWireConnectionsRepository.java | 30 +++++++++++++
.../tuweni/rlpx/WireConnectionRepository.java | 28 +++++++++++++
.../tuweni/rlpx/wire/DefaultWireConnection.java | 28 +++++++++++--
.../apache/tuweni/rlpx/wire/WireConnection.java | 33 +++++++++++++++
.../rlpx/MemoryWireConnectionsRepositoryTest.java | 20 +++++++++
.../rlpx/wire/DefaultWireConnectionTest.java | 15 ++++++-
.../org/apache/tuweni/rlpx/wire/PingPongTest.java | 5 ++-
10 files changed, 222 insertions(+), 9 deletions(-)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
index 2ed1bca..b6b1137 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
@@ -282,6 +282,7 @@ class EthereumNodeRecord(
* @return the hash of the public key
*/
fun nodeId() = EthereumNodeRecord.nodeId(publicKey())
+
/**
* The ip associated with the ENR
* @return The IP adress of the ENR
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
index e4b301a..4baa039 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
@@ -25,6 +25,17 @@ import java.util.concurrent.ConcurrentHashMap
class WireConnectionPeerRepositoryAdapter(val peerRepository: PeerRepository) : WireConnectionRepository {
+ private val connectionListeners = ArrayList<WireConnectionRepository.Listener>()
+ private val disconnectionListeners = ArrayList<WireConnectionRepository.Listener>()
+
+ override fun addConnectionListener(listener: WireConnectionRepository.Listener) {
+ connectionListeners.add(listener)
+ }
+
+ override fun addDisconnectionListener(listener: WireConnectionRepository.Listener) {
+ disconnectionListeners.add(listener)
+ }
+
private val connections = ConcurrentHashMap<String, WireConnection>()
override fun add(wireConnection: WireConnection): String {
@@ -33,6 +44,17 @@ class WireConnectionPeerRepositoryAdapter(val peerRepository: PeerRepository) :
val peer = peerRepository.storePeer(id, Instant.now(), Instant.now())
peerRepository.addConnection(peer, id)
connections[id.id()] = wireConnection
+ wireConnection.registerListener {
+ if (it == WireConnection.Event.CONNECTED) {
+ for (listener in connectionListeners) {
+ listener.connectionEvent(wireConnection)
+ }
+ } else if (it == WireConnection.Event.DISCONNECTED) {
+ for (listener in disconnectionListeners) {
+ listener.connectionEvent(wireConnection)
+ }
+ }
+ }
return id.id()
}
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index 6270eb9..0ed9795 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -13,6 +13,7 @@
package org.apache.tuweni.rlpx.vertx;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,15 +27,19 @@ import org.apache.tuweni.junit.BouncyCastleExtension;
import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.rlpx.MemoryWireConnectionsRepository;
+import org.apache.tuweni.rlpx.RLPxService;
import org.apache.tuweni.rlpx.wire.DisconnectReason;
import org.apache.tuweni.rlpx.wire.SubProtocol;
import org.apache.tuweni.rlpx.wire.SubProtocolClient;
+import org.apache.tuweni.rlpx.wire.SubProtocolHandler;
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.Test;
@@ -156,20 +161,56 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
MemoryWireConnectionsRepository repository = new MemoryWireConnectionsRepository();
+ AtomicBoolean called = new AtomicBoolean();
+ repository.addConnectionListener(conn -> called.set(true));
+ List<SubProtocol> protocols = Collections.singletonList(new SubProtocol() {
+ @Override
+ public SubProtocolIdentifier id() {
+ return SubProtocolIdentifier.of("eth", 63);
+ }
+
+ @Override
+ public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
+ return false;
+ }
+
+ @Override
+ public int versionRange(int version) {
+ return 0;
+ }
+
+ @Override
+ public SubProtocolHandler createHandler(RLPxService service, SubProtocolClient client) {
+ SubProtocolHandler handler = mock(SubProtocolHandler.class);
+ when(handler.stop()).thenReturn(AsyncCompletion.COMPLETED);
+ return handler;
+ }
+
+ @Override
+ public SubProtocolClient createClient(RLPxService service) {
+ return mock(SubProtocolClient.class);
+ }
+ });
VertxRLPxService service =
- new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", repository);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", repository);
service.start().join();
MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", peerRepository);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", peerRepository);
peerService.start().join();
try {
- service.connectTo(peerPair.publicKey(), new InetSocketAddress("localhost", peerService.actualPort()));
- Thread.sleep(3000);
+ WireConnection conn =
+ service.connectTo(peerPair.publicKey(), new InetSocketAddress("localhost", peerService.actualPort())).get();
+ assertNotNull(conn);
assertEquals(1, repository.asMap().size());
+ AtomicBoolean disconnect = new AtomicBoolean();
+ repository.addDisconnectionListener(c -> disconnect.set(true));
+ service.disconnect(conn, DisconnectReason.CLIENT_QUITTING);
+ assertTrue(disconnect.get());
+
} finally {
service.stop();
peerService.stop();
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
index 8adc6b1..fe3c5e7 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
@@ -12,9 +12,13 @@
*/
package org.apache.tuweni.rlpx;
+import static org.apache.tuweni.rlpx.wire.WireConnection.Event.*;
+
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,10 +32,26 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
private final Map<String, WireConnection> connections = new ConcurrentHashMap<>();
+ private final List<Listener> connectionListeners = new ArrayList<>();
+
+ private final List<Listener> disconnectionListeners = new ArrayList<>();
+
@Override
public String add(WireConnection wireConnection) {
String id = UUID.randomUUID().toString();
connections.put(id, wireConnection);
+ wireConnection.registerListener((event) -> {
+ if (event == CONNECTED) {
+ for (Listener listener : connectionListeners) {
+ listener.connectionEvent(wireConnection);
+ }
+ } else if (event == DISCONNECTED) {
+ connections.remove(id);
+ for (Listener listener : disconnectionListeners) {
+ listener.connectionEvent(wireConnection);
+ }
+ }
+ });
return id;
}
@@ -58,4 +78,14 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
public Map<String, WireConnection> asMap() {
return connections;
}
+
+ @Override
+ public void addConnectionListener(Listener listener) {
+ connectionListeners.add(listener);
+ }
+
+ @Override
+ public void addDisconnectionListener(Listener listener) {
+ disconnectionListeners.add(listener);
+ }
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
index d239a49..9807a02 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
@@ -24,6 +24,19 @@ import javax.annotation.Nullable;
public interface WireConnectionRepository {
/**
+ * Connection listener
+ */
+ interface Listener {
+
+ /**
+ * Callback triggered when a connection changes
+ *
+ * @param conn the connection change
+ */
+ void connectionEvent(WireConnection conn);
+ }
+
+ /**
* Adds a new wire connection to the repository.
*
* @param wireConnection the new wire connection
@@ -62,4 +75,19 @@ public interface WireConnectionRepository {
*
*/
void close();
+
+ /**
+ * Adds a listener called when connection occurs, ie when the connection is established and capabilities are
+ * exchanged.
+ *
+ * @param listener the listener
+ */
+ void addConnectionListener(Listener listener);
+
+ /**
+ * Adds a listener called when disconnection occurs, either from us or the peer initiative.
+ *
+ * @param listener the listener
+ */
+ void addDisconnectionListener(Listener listener);
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index e887214..70fd081 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -19,6 +19,8 @@ import org.apache.tuweni.concurrent.CompletableAsyncResult;
import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.rlpx.RLPxMessage;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -59,6 +61,7 @@ public final class DefaultWireConnection implements WireConnection {
private DisconnectReason disconnectReason;
private boolean disconnectRequested;
private boolean disconnectReceived;
+ private EventListener eventListener;
/**
* Default constructor.
@@ -150,17 +153,21 @@ public final class DefaultWireConnection implements WireConnection {
.stream()
.map(subprotocols::get)
.map(handler -> handler.handleNewPeerConnection(this)));
- allSubProtocols.thenRun(() -> ready.complete(this));
+ allSubProtocols.thenRun(() -> {
+ ready.complete(this);
+ eventListener.onEvent(Event.CONNECTED);
+ });
return;
} else if (message.messageId() == 1) {
DisconnectMessage disconnect = DisconnectMessage.read(message.content());
logger.info("Received disconnect {}", disconnect);
- disconnectHandler.run();
disconnectReceived = true;
disconnectReason = DisconnectReason.valueOf(disconnect.reason());
+ disconnectHandler.run();
if (!ready.isDone()) {
ready.cancel();
}
+ eventListener.onEvent(Event.DISCONNECTED);
return;
}
@@ -233,9 +240,10 @@ public final class DefaultWireConnection implements WireConnection {
public void disconnect(DisconnectReason reason) {
logger.debug("Sending disconnect message with reason {}", reason);
writer.accept(new RLPxMessage(1, new DisconnectMessage(reason).toBytes()));
- disconnectHandler.run();
disconnectRequested = true;
disconnectReason = reason;
+ disconnectHandler.run();
+ eventListener.onEvent(Event.DISCONNECTED);
}
/**
@@ -286,6 +294,15 @@ public final class DefaultWireConnection implements WireConnection {
return false;
}
+ @Override
+ public Collection<SubProtocolIdentifier> agreedSubprotocols() {
+ List<SubProtocolIdentifier> identifiers = new ArrayList<>();
+ for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
+ identifiers.addAll(sp.getCapabilities());
+ }
+ return identifiers;
+ }
+
public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) {
logger.trace("Sending sub-protocol message {} {}", messageType, message);
Integer offset = null;
@@ -344,4 +361,9 @@ public final class DefaultWireConnection implements WireConnection {
public HelloMessage getPeerHello() {
return peerHelloMessage;
}
+
+ @Override
+ public void registerListener(EventListener listener) {
+ eventListener = listener;
+ }
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
index 5b407dd..83ba7a6 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
@@ -15,12 +15,31 @@ package org.apache.tuweni.rlpx.wire;
import org.apache.tuweni.crypto.SECP256K1;
+import java.util.Collection;
+
/**
* A stateful connection between two peers under the Devp2p wire protocol.
*/
public interface WireConnection {
/**
+ * Listener of events occurring on the connection.
+ */
+ interface EventListener {
+
+ /**
+ * Callback for the listener to be notified of events.
+ *
+ * @param event the event type
+ */
+ void onEvent(Event event);
+ }
+
+ public enum Event {
+ CONNECTED, DISCONNECTED;
+ }
+
+ /**
* Returns true if the connection supports the subprotocol
*
* @param subProtocolIdentifier the subprotocol identifier
@@ -29,6 +48,13 @@ public interface WireConnection {
boolean supports(SubProtocolIdentifier subProtocolIdentifier);
/**
+ * Provides the subprotocols the connection supports in common with the service.
+ *
+ * @return agreed subprotocols
+ */
+ Collection<SubProtocolIdentifier> agreedSubprotocols();
+
+ /**
* Whether the peer asked to disconnect
*
* @return true if the peer asked to disconnect
@@ -85,4 +111,11 @@ public interface WireConnection {
* @return the hello message the remote peer sent.
*/
HelloMessage getPeerHello();
+
+ /**
+ * Registers a listener to consume events sent by this connection.
+ *
+ * @param listener the function called when an event triggers
+ */
+ void registerListener(EventListener listener);
}
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
index 216a46a..9834e13 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
@@ -16,12 +16,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import org.apache.tuweni.rlpx.wire.WireConnection;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
class MemoryWireConnectionsRepositoryTest {
@@ -44,4 +47,21 @@ class MemoryWireConnectionsRepositoryTest {
assertFalse(iter.hasNext());
}
+ @Test
+ void testListeners() {
+ MemoryWireConnectionsRepository repo = new MemoryWireConnectionsRepository();
+ WireConnection conn = mock(WireConnection.class);
+ ArgumentCaptor<WireConnection.EventListener> listener = ArgumentCaptor.forClass(WireConnection.EventListener.class);
+ repo.add(conn);
+ verify(conn).registerListener(listener.capture());
+ AtomicBoolean called = new AtomicBoolean(false);
+ repo.addConnectionListener(c -> called.set(true));
+ listener.getValue().onEvent(WireConnection.Event.CONNECTED);
+ assertTrue(called.get());
+ AtomicBoolean calledDisconnect = new AtomicBoolean(false);
+ repo.addDisconnectionListener(c -> calledDisconnect.set(true));
+ listener.getValue().onEvent(WireConnection.Event.DISCONNECTED);
+ assertTrue(calledDisconnect.get());
+ }
+
}
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
index 4f0f4d2..9cf7719 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
@@ -40,7 +40,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 3, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
-
+ conn.registerListener(event -> {
+ });
conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
assertEquals(1, capturedDisconnect.get().messageId());
DisconnectMessage msg = DisconnectMessage.read(capturedDisconnect.get().content());
@@ -55,6 +56,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 4, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
assertEquals(1, capturedDisconnect.get().messageId());
@@ -70,6 +73,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 28, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn
.messageReceived(
@@ -92,6 +97,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn
.messageReceived(
@@ -110,6 +117,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn
.messageReceived(
@@ -129,6 +138,8 @@ class DefaultWireConnectionTest {
DefaultWireConnection conn = new DefaultWireConnection(nodeId, nodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 33, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn
.messageReceived(
@@ -147,6 +158,8 @@ class DefaultWireConnectionTest {
new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 5, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+ conn.registerListener(event -> {
+ });
conn.sendHello();
conn
.messageReceived(
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
index e267588..0279818 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
@@ -55,6 +55,8 @@ class PingPongTest {
AsyncResult.incomplete(),
"127.0.0.1",
1234);
+ conn.registerListener(event -> {
+ });
AsyncCompletion completion = conn.sendPing();
assertFalse(completion.isDone());
@@ -70,7 +72,8 @@ class PingPongTest {
DefaultWireConnection conn = new DefaultWireConnection(nodeId, peerNodeId, capturedPong::set, helloMessage -> {
}, () -> {
}, new LinkedHashMap<>(), 1, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
-
+ conn.registerListener(event -> {
+ });
conn.messageReceived(new RLPxMessage(2, Bytes.EMPTY));
assertNotNull(capturedPong.get());
assertEquals(3, capturedPong.get().messageId());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org