You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/11/26 09:02:33 UTC

[incubator-tuweni] branch master updated: Add listeners for connection and disconnection

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b808d6  Add listeners for connection and disconnection
     new d71c5e6  Merge pull request #175 from atoulme/add_connection_disconnection_listeners
3b808d6 is described below

commit 3b808d62883b360e8b04b3f7aa2a178b1ba6d644
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Nov 25 23:06:52 2020 -0800

    Add listeners for connection and disconnection
---
 .../org/apache/tuweni/devp2p/EthereumNodeRecord.kt |  1 +
 .../WireConnectionPeerRepositoryAdapter.kt         | 22 ++++++++++
 .../tuweni/rlpx/vertx/VertxRLPxServiceTest.java    | 49 ++++++++++++++++++++--
 .../rlpx/MemoryWireConnectionsRepository.java      | 30 +++++++++++++
 .../tuweni/rlpx/WireConnectionRepository.java      | 28 +++++++++++++
 .../tuweni/rlpx/wire/DefaultWireConnection.java    | 28 +++++++++++--
 .../apache/tuweni/rlpx/wire/WireConnection.java    | 33 +++++++++++++++
 .../rlpx/MemoryWireConnectionsRepositoryTest.java  | 20 +++++++++
 .../rlpx/wire/DefaultWireConnectionTest.java       | 15 ++++++-
 .../org/apache/tuweni/rlpx/wire/PingPongTest.java  |  5 ++-
 10 files changed, 222 insertions(+), 9 deletions(-)

diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
index 2ed1bca..b6b1137 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
@@ -282,6 +282,7 @@ class EthereumNodeRecord(
    * @return the hash of the public key
    */
   fun nodeId() = EthereumNodeRecord.nodeId(publicKey())
+
   /**
    * The ip associated with the ENR
    * @return The IP adress of the ENR
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
index e4b301a..4baa039 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
@@ -25,6 +25,17 @@ import java.util.concurrent.ConcurrentHashMap
 
 class WireConnectionPeerRepositoryAdapter(val peerRepository: PeerRepository) : WireConnectionRepository {
 
+  private val connectionListeners = ArrayList<WireConnectionRepository.Listener>()
+  private val disconnectionListeners = ArrayList<WireConnectionRepository.Listener>()
+
+  override fun addConnectionListener(listener: WireConnectionRepository.Listener) {
+    connectionListeners.add(listener)
+  }
+
+  override fun addDisconnectionListener(listener: WireConnectionRepository.Listener) {
+    disconnectionListeners.add(listener)
+  }
+
   private val connections = ConcurrentHashMap<String, WireConnection>()
 
   override fun add(wireConnection: WireConnection): String {
@@ -33,6 +44,17 @@ class WireConnectionPeerRepositoryAdapter(val peerRepository: PeerRepository) :
     val peer = peerRepository.storePeer(id, Instant.now(), Instant.now())
     peerRepository.addConnection(peer, id)
     connections[id.id()] = wireConnection
+    wireConnection.registerListener {
+      if (it == WireConnection.Event.CONNECTED) {
+        for (listener in connectionListeners) {
+          listener.connectionEvent(wireConnection)
+        }
+      } else if (it == WireConnection.Event.DISCONNECTED) {
+        for (listener in disconnectionListeners) {
+          listener.connectionEvent(wireConnection)
+        }
+      }
+    }
     return id.id()
   }
 
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index 6270eb9..0ed9795 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -13,6 +13,7 @@
 package org.apache.tuweni.rlpx.vertx;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,15 +27,19 @@ import org.apache.tuweni.junit.BouncyCastleExtension;
 import org.apache.tuweni.junit.VertxExtension;
 import org.apache.tuweni.junit.VertxInstance;
 import org.apache.tuweni.rlpx.MemoryWireConnectionsRepository;
+import org.apache.tuweni.rlpx.RLPxService;
 import org.apache.tuweni.rlpx.wire.DisconnectReason;
 import org.apache.tuweni.rlpx.wire.SubProtocol;
 import org.apache.tuweni.rlpx.wire.SubProtocolClient;
+import org.apache.tuweni.rlpx.wire.SubProtocolHandler;
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.vertx.core.Vertx;
 import org.junit.jupiter.api.Test;
@@ -156,20 +161,56 @@ class VertxRLPxServiceTest {
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
 
     MemoryWireConnectionsRepository repository = new MemoryWireConnectionsRepository();
+    AtomicBoolean called = new AtomicBoolean();
+    repository.addConnectionListener(conn -> called.set(true));
+    List<SubProtocol> protocols = Collections.singletonList(new SubProtocol() {
+      @Override
+      public SubProtocolIdentifier id() {
+        return SubProtocolIdentifier.of("eth", 63);
+      }
+
+      @Override
+      public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
+        return false;
+      }
+
+      @Override
+      public int versionRange(int version) {
+        return 0;
+      }
+
+      @Override
+      public SubProtocolHandler createHandler(RLPxService service, SubProtocolClient client) {
+        SubProtocolHandler handler = mock(SubProtocolHandler.class);
+        when(handler.stop()).thenReturn(AsyncCompletion.COMPLETED);
+        return handler;
+      }
+
+      @Override
+      public SubProtocolClient createClient(RLPxService service) {
+        return mock(SubProtocolClient.class);
+      }
+    });
     VertxRLPxService service =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", repository);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", repository);
     service.start().join();
 
     MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
     VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", peerRepository);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", peerRepository);
     peerService.start().join();
 
     try {
-      service.connectTo(peerPair.publicKey(), new InetSocketAddress("localhost", peerService.actualPort()));
-      Thread.sleep(3000);
+      WireConnection conn =
+          service.connectTo(peerPair.publicKey(), new InetSocketAddress("localhost", peerService.actualPort())).get();
+      assertNotNull(conn);
       assertEquals(1, repository.asMap().size());
 
+      AtomicBoolean disconnect = new AtomicBoolean();
+      repository.addDisconnectionListener(c -> disconnect.set(true));
+      service.disconnect(conn, DisconnectReason.CLIENT_QUITTING);
+      assertTrue(disconnect.get());
+
     } finally {
       service.stop();
       peerService.stop();
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
index 8adc6b1..fe3c5e7 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
@@ -12,9 +12,13 @@
  */
 package org.apache.tuweni.rlpx;
 
+import static org.apache.tuweni.rlpx.wire.WireConnection.Event.*;
+
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,10 +32,26 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
 
   private final Map<String, WireConnection> connections = new ConcurrentHashMap<>();
 
+  private final List<Listener> connectionListeners = new ArrayList<>();
+
+  private final List<Listener> disconnectionListeners = new ArrayList<>();
+
   @Override
   public String add(WireConnection wireConnection) {
     String id = UUID.randomUUID().toString();
     connections.put(id, wireConnection);
+    wireConnection.registerListener((event) -> {
+      if (event == CONNECTED) {
+        for (Listener listener : connectionListeners) {
+          listener.connectionEvent(wireConnection);
+        }
+      } else if (event == DISCONNECTED) {
+        connections.remove(id);
+        for (Listener listener : disconnectionListeners) {
+          listener.connectionEvent(wireConnection);
+        }
+      }
+    });
     return id;
   }
 
@@ -58,4 +78,14 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
   public Map<String, WireConnection> asMap() {
     return connections;
   }
+
+  @Override
+  public void addConnectionListener(Listener listener) {
+    connectionListeners.add(listener);
+  }
+
+  @Override
+  public void addDisconnectionListener(Listener listener) {
+    disconnectionListeners.add(listener);
+  }
 }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
index d239a49..9807a02 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
@@ -24,6 +24,19 @@ import javax.annotation.Nullable;
 public interface WireConnectionRepository {
 
   /**
+   * Connection listener
+   */
+  interface Listener {
+
+    /**
+     * Callback triggered when a connection changes
+     * 
+     * @param conn the connection change
+     */
+    void connectionEvent(WireConnection conn);
+  }
+
+  /**
    * Adds a new wire connection to the repository.
    * 
    * @param wireConnection the new wire connection
@@ -62,4 +75,19 @@ public interface WireConnectionRepository {
    *
    */
   void close();
+
+  /**
+   * Adds a listener called when connection occurs, ie when the connection is established and capabilities are
+   * exchanged.
+   * 
+   * @param listener the listener
+   */
+  void addConnectionListener(Listener listener);
+
+  /**
+   * Adds a listener called when disconnection occurs, either from us or the peer initiative.
+   * 
+   * @param listener the listener
+   */
+  void addDisconnectionListener(Listener listener);
 }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index e887214..70fd081 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -19,6 +19,8 @@ import org.apache.tuweni.concurrent.CompletableAsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.rlpx.RLPxMessage;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -59,6 +61,7 @@ public final class DefaultWireConnection implements WireConnection {
   private DisconnectReason disconnectReason;
   private boolean disconnectRequested;
   private boolean disconnectReceived;
+  private EventListener eventListener;
 
   /**
    * Default constructor.
@@ -150,17 +153,21 @@ public final class DefaultWireConnection implements WireConnection {
                   .stream()
                   .map(subprotocols::get)
                   .map(handler -> handler.handleNewPeerConnection(this)));
-      allSubProtocols.thenRun(() -> ready.complete(this));
+      allSubProtocols.thenRun(() -> {
+        ready.complete(this);
+        eventListener.onEvent(Event.CONNECTED);
+      });
       return;
     } else if (message.messageId() == 1) {
       DisconnectMessage disconnect = DisconnectMessage.read(message.content());
       logger.info("Received disconnect {}", disconnect);
-      disconnectHandler.run();
       disconnectReceived = true;
       disconnectReason = DisconnectReason.valueOf(disconnect.reason());
+      disconnectHandler.run();
       if (!ready.isDone()) {
         ready.cancel();
       }
+      eventListener.onEvent(Event.DISCONNECTED);
       return;
     }
 
@@ -233,9 +240,10 @@ public final class DefaultWireConnection implements WireConnection {
   public void disconnect(DisconnectReason reason) {
     logger.debug("Sending disconnect message with reason {}", reason);
     writer.accept(new RLPxMessage(1, new DisconnectMessage(reason).toBytes()));
-    disconnectHandler.run();
     disconnectRequested = true;
     disconnectReason = reason;
+    disconnectHandler.run();
+    eventListener.onEvent(Event.DISCONNECTED);
   }
 
   /**
@@ -286,6 +294,15 @@ public final class DefaultWireConnection implements WireConnection {
     return false;
   }
 
+  @Override
+  public Collection<SubProtocolIdentifier> agreedSubprotocols() {
+    List<SubProtocolIdentifier> identifiers = new ArrayList<>();
+    for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
+      identifiers.addAll(sp.getCapabilities());
+    }
+    return identifiers;
+  }
+
   public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) {
     logger.trace("Sending sub-protocol message {} {}", messageType, message);
     Integer offset = null;
@@ -344,4 +361,9 @@ public final class DefaultWireConnection implements WireConnection {
   public HelloMessage getPeerHello() {
     return peerHelloMessage;
   }
+
+  @Override
+  public void registerListener(EventListener listener) {
+    eventListener = listener;
+  }
 }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
index 5b407dd..83ba7a6 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
@@ -15,12 +15,31 @@ package org.apache.tuweni.rlpx.wire;
 
 import org.apache.tuweni.crypto.SECP256K1;
 
+import java.util.Collection;
+
 /**
  * A stateful connection between two peers under the Devp2p wire protocol.
  */
 public interface WireConnection {
 
   /**
+   * Listener of events occurring on the connection.
+   */
+  interface EventListener {
+
+    /**
+     * Callback for the listener to be notified of events.
+     * 
+     * @param event the event type
+     */
+    void onEvent(Event event);
+  }
+
+  public enum Event {
+    CONNECTED, DISCONNECTED;
+  }
+
+  /**
    * Returns true if the connection supports the subprotocol
    * 
    * @param subProtocolIdentifier the subprotocol identifier
@@ -29,6 +48,13 @@ public interface WireConnection {
   boolean supports(SubProtocolIdentifier subProtocolIdentifier);
 
   /**
+   * Provides the subprotocols the connection supports in common with the service.
+   * 
+   * @return agreed subprotocols
+   */
+  Collection<SubProtocolIdentifier> agreedSubprotocols();
+
+  /**
    * Whether the peer asked to disconnect
    * 
    * @return true if the peer asked to disconnect
@@ -85,4 +111,11 @@ public interface WireConnection {
    * @return the hello message the remote peer sent.
    */
   HelloMessage getPeerHello();
+
+  /**
+   * Registers a listener to consume events sent by this connection.
+   *
+   * @param listener the function called when an event triggers
+   */
+  void registerListener(EventListener listener);
 }
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
index 216a46a..9834e13 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepositoryTest.java
@@ -16,12 +16,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
 class MemoryWireConnectionsRepositoryTest {
 
@@ -44,4 +47,21 @@ class MemoryWireConnectionsRepositoryTest {
     assertFalse(iter.hasNext());
   }
 
+  @Test
+  void testListeners() {
+    MemoryWireConnectionsRepository repo = new MemoryWireConnectionsRepository();
+    WireConnection conn = mock(WireConnection.class);
+    ArgumentCaptor<WireConnection.EventListener> listener = ArgumentCaptor.forClass(WireConnection.EventListener.class);
+    repo.add(conn);
+    verify(conn).registerListener(listener.capture());
+    AtomicBoolean called = new AtomicBoolean(false);
+    repo.addConnectionListener(c -> called.set(true));
+    listener.getValue().onEvent(WireConnection.Event.CONNECTED);
+    assertTrue(called.get());
+    AtomicBoolean calledDisconnect = new AtomicBoolean(false);
+    repo.addDisconnectionListener(c -> calledDisconnect.set(true));
+    listener.getValue().onEvent(WireConnection.Event.DISCONNECTED);
+    assertTrue(calledDisconnect.get());
+  }
+
 }
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
index 4f0f4d2..9cf7719 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
@@ -40,7 +40,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 3, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
-
+    conn.registerListener(event -> {
+    });
     conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
     assertEquals(1, capturedDisconnect.get().messageId());
     DisconnectMessage msg = DisconnectMessage.read(capturedDisconnect.get().content());
@@ -55,6 +56,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 4, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
     assertEquals(1, capturedDisconnect.get().messageId());
@@ -70,6 +73,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 28, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn
         .messageReceived(
@@ -92,6 +97,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn
         .messageReceived(
@@ -110,6 +117,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn
         .messageReceived(
@@ -129,6 +138,8 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn = new DefaultWireConnection(nodeId, nodeId, capturedDisconnect::set, helloMessage -> {
     }, () -> {
     }, new LinkedHashMap<>(), 33, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn
         .messageReceived(
@@ -147,6 +158,8 @@ class DefaultWireConnectionTest {
         new DefaultWireConnection(nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> {
         }, () -> {
         }, new LinkedHashMap<>(), 5, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
+    conn.registerListener(event -> {
+    });
     conn.sendHello();
     conn
         .messageReceived(
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
index e267588..0279818 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
@@ -55,6 +55,8 @@ class PingPongTest {
         AsyncResult.incomplete(),
         "127.0.0.1",
         1234);
+    conn.registerListener(event -> {
+    });
 
     AsyncCompletion completion = conn.sendPing();
     assertFalse(completion.isDone());
@@ -70,7 +72,8 @@ class PingPongTest {
     DefaultWireConnection conn = new DefaultWireConnection(nodeId, peerNodeId, capturedPong::set, helloMessage -> {
     }, () -> {
     }, new LinkedHashMap<>(), 1, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 1234);
-
+    conn.registerListener(event -> {
+    });
     conn.messageReceived(new RLPxMessage(2, Bytes.EMPTY));
     assertNotNull(capturedPong.get());
     assertEquals(3, capturedPong.get().messageId());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org