You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/29 02:33:58 UTC

[incubator-tuweni] branch main updated: Allow to configure max clients of rlpx service

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 0235f6024 Allow to configure max clients of rlpx service
     new 8fa509038 Merge pull request #497 from atoulme/max_clients
0235f6024 is described below

commit 0235f6024d8843ab4577bd21c0c821ca9b79387a
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jan 28 18:07:05 2023 -0800

    Allow to configure max clients of rlpx service
---
 .../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt  |  7 ++-
 .../devp2p/eth/SendPendingTransactionsTest.kt      |  3 +-
 .../devp2p/proxy/SendDataToAnotherNodeTest.kt      |  2 +
 .../org/apache/tuweni/ethclient/EthereumClient.kt  |  1 +
 .../tuweni/ethclient/EthereumClientConfig.kt       | 10 ++++
 .../org/apache/tuweni/eth/crawler/CrawlerApp.kt    |  1 +
 .../tuweni/rlpx/vertx/VertxAcceptanceTest.java     |  6 ++-
 .../tuweni/rlpx/vertx/VertxRLPxServiceTest.java    | 61 ++++++++++++++++++----
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 30 +++++++++++
 9 files changed, 106 insertions(+), 15 deletions(-)

diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index 68065939c..815d747ab 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -53,7 +53,7 @@ class ConnectToAnotherNodeTest {
   @Disabled
   @Test
   fun testCollectHeaders(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
-    val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
+    val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
     val genesisFile = GenesisFile.read(contents)
     val genesisBlock = genesisFile.toBlock()
 
@@ -88,6 +88,7 @@ class ConnectToAnotherNodeTest {
         )
       ),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
     service.start().await()
@@ -116,7 +117,7 @@ class ConnectToAnotherNodeTest {
   @Disabled("flaky")
   @Test
   fun twoServers(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
-    val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
+    val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
     val genesisBlock = GenesisFile.read(contents).toBlock()
 
     val repository = BlockchainRepository.init(
@@ -150,6 +151,7 @@ class ConnectToAnotherNodeTest {
         )
       ),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
 
@@ -185,6 +187,7 @@ class ConnectToAnotherNodeTest {
         )
       ),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
     val result = AsyncCompletion.allOf(service.start(), service2.start()).then {
diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
index 452ba4bc8..82596c56f 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
@@ -62,7 +62,7 @@ class SendPendingTransactionsTest {
 
   @Test
   fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
-    val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
+    val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json")!!.readAllBytes()
     val genesisFile = GenesisFile.read(contents)
     val genesisBlock = genesisFile.toBlock()
 
@@ -97,6 +97,7 @@ class SendPendingTransactionsTest {
         )
       ),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
     service.start().await()
diff --git a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
index 5fe00bee6..d2e824538 100644
--- a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
+++ b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
@@ -54,6 +54,7 @@ class SendDataToAnotherNodeTest {
         ProxySubprotocol()
       ),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
 
@@ -66,6 +67,7 @@ class SendDataToAnotherNodeTest {
       service2kp,
       listOf(ProxySubprotocol()),
       "Tuweni Experiment 0.1",
+      10,
       meter
     )
     val recorder = RecordingClientHandler()
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 6267e9ebb..792e3d9ea 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -272,6 +272,7 @@ class EthereumClient(
           rlpxConfig.keyPair(),
           listOf(ethSubprotocol, proxySubprotocol),
           rlpxConfig.clientName(),
+          rlpxConfig.maxConnections(),
           meter,
           adapter
         )
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
index aead7ad2d..fcf4202fa 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
@@ -66,6 +66,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
       RLPxServiceConfigurationImpl(
         section,
         sectionConfig.getString("clientName"),
+        sectionConfig.getInteger("maxConnections"),
         sectionConfig.getInteger("port"),
         sectionConfig.getString("networkInterface"),
         sectionConfig.getInteger("advertisedPort"),
@@ -337,6 +338,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
         "Port to advertise in communications as the RLPx service port",
         PropertyValidator.isValidPort()
       )
+      rlpx.addInteger(
+        "maxConnections",
+        50,
+        "Maximum number of clients",
+        PropertyValidator.isGreaterOrEqual(1)
+      )
       rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum client", null)
       rlpx.addString("repository", "default", "Name of the blockchain repository", null)
       rlpx.addString("peerRepository", "default", "Peer repository to which records should go", null)
@@ -430,6 +437,7 @@ interface RLPxServiceConfiguration {
   fun repository(): String
   fun getName(): String
   fun clientName(): String
+  fun maxConnections(): Int
   fun peerRepository(): String
 }
 
@@ -501,6 +509,7 @@ internal class PeerRepositoryConfigurationImpl(private val repoName: String, pri
 internal data class RLPxServiceConfigurationImpl(
   private val name: String,
   val clientName: String,
+  val maxConnections: Int,
   val port: Int,
   val networkInterface: String,
   val advertisedPort: Int,
@@ -524,6 +533,7 @@ internal data class RLPxServiceConfigurationImpl(
   override fun getName(): String = name
 
   override fun clientName(): String = clientName
+  override fun maxConnections(): Int = maxConnections
 
   override fun peerRepository(): String = peerRepository
 }
diff --git a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
index 33e300297..f05fc1376 100644
--- a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
+++ b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
@@ -193,6 +193,7 @@ class CrawlerApplication(
       SECP256K1.KeyPair.random(),
       listOf(ethHelloProtocol),
       "Apache Tuweni network crawler",
+      50,
       meter,
       wireConnectionsRepository
     )
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
index 920203943..28ecdf92e 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
@@ -125,6 +125,7 @@ class VertxAcceptanceTest {
         kp,
         Collections.singletonList(sp),
         "Client 1",
+        10,
         meter,
         repository);
     MemoryWireConnectionsRepository secondRepository = new MemoryWireConnectionsRepository();
@@ -137,6 +138,7 @@ class VertxAcceptanceTest {
         secondKp,
         Collections.singletonList(secondSp),
         "Client 2",
+        10,
         meter,
         secondRepository);
     service.start().join();
@@ -180,6 +182,7 @@ class VertxAcceptanceTest {
         kp,
         Collections.singletonList(sp),
         "Client 1",
+        10,
         meter,
         repository);
     VertxRLPxService secondService = new VertxRLPxService(
@@ -190,6 +193,7 @@ class VertxAcceptanceTest {
         secondKp,
         Collections.singletonList(secondSp),
         "Client 2",
+        10,
         meter,
         secondRepository);
     service.start().join();
@@ -282,7 +286,7 @@ class VertxAcceptanceTest {
           public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier identifier) {
             return null;
           }
-        }), "Client 1", meter, repository);
+        }), "Client 1", 10, meter, repository);
     service.start().join();
 
     AsyncResult<WireConnection> completion = service
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index a59b5910b..754ac76db 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -65,6 +65,7 @@ class VertxRLPxServiceTest {
             SECP256K1.KeyPair.random(),
             new ArrayList<>(),
             "a",
+            10,
             meter));
   }
 
@@ -80,6 +81,7 @@ class VertxRLPxServiceTest {
             SECP256K1.KeyPair.random(),
             new ArrayList<>(),
             "a",
+            10,
             meter));
   }
 
@@ -95,6 +97,23 @@ class VertxRLPxServiceTest {
             SECP256K1.KeyPair.random(),
             new ArrayList<>(),
             null,
+            10,
+            meter));
+  }
+
+  @Test
+  void invalidMaxConnections(@VertxInstance Vertx vertx) {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> new VertxRLPxService(
+            vertx,
+            34,
+            "localhost",
+            23,
+            SECP256K1.KeyPair.random(),
+            new ArrayList<>(),
+            "foo",
+            -1,
             meter));
   }
 
@@ -110,6 +129,7 @@ class VertxRLPxServiceTest {
             SECP256K1.KeyPair.random(),
             new ArrayList<>(),
             "   ",
+            10,
             meter));
   }
 
@@ -123,6 +143,7 @@ class VertxRLPxServiceTest {
         SECP256K1.KeyPair.random(),
         new ArrayList<>(),
         "a",
+        10,
         meter);
 
     service.start().join();
@@ -136,7 +157,7 @@ class VertxRLPxServiceTest {
   @Test
   void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedException {
     VertxRLPxService service =
-        new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", meter);
+        new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", 10, meter);
 
     service.start().join();
     try {
@@ -149,8 +170,16 @@ class VertxRLPxServiceTest {
 
   @Test
   void stopServiceWithoutStartingItFirst(@VertxInstance Vertx vertx) {
-    VertxRLPxService service =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, SECP256K1.KeyPair.random(), new ArrayList<>(), "abc", meter);
+    VertxRLPxService service = new VertxRLPxService(
+        vertx,
+        0,
+        "localhost",
+        10000,
+        SECP256K1.KeyPair.random(),
+        new ArrayList<>(),
+        "abc",
+        10,
+        meter);
     AsyncCompletion completion = service.stop();
     assertTrue(completion.isDone());
   }
@@ -160,11 +189,11 @@ class VertxRLPxServiceTest {
     SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
     VertxRLPxService service =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", meter);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", 10, meter);
     service.start().join();
 
     VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", 10, meter);
     peerService.start().join();
 
     WireConnection conn =
@@ -179,12 +208,12 @@ class VertxRLPxServiceTest {
     SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
     List<SubProtocol> protocols = Arrays.asList(new VertxAcceptanceTest.MyCustomSubProtocol());
-    VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter);
+    VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter);
     service.start().join();
 
 
     VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter);
     peerService.start().join();
 
     WireConnection conn = null;
@@ -234,12 +263,12 @@ class VertxRLPxServiceTest {
       }
     });
     VertxRLPxService service =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter, repository);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter, repository);
     service.start().join();
 
     MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
     VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter, peerRepository);
+        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter, peerRepository);
     peerService.start().join();
 
     try {
@@ -263,8 +292,17 @@ class VertxRLPxServiceTest {
   void getClientWhenNotReady(@VertxInstance Vertx vertx) {
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
     MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
-    VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter, peerRepository);
+    VertxRLPxService peerService = new VertxRLPxService(
+        vertx,
+        0,
+        "localhost",
+        10000,
+        peerPair,
+        new ArrayList<>(),
+        "abc",
+        10,
+        meter,
+        peerRepository);
     assertThrows(IllegalStateException.class, () -> {
       peerService.getClient(SubProtocolIdentifier.of("foo", 1));
     });
@@ -286,6 +324,7 @@ class VertxRLPxServiceTest {
         SECP256K1.KeyPair.random(),
         Collections.singletonList(sp),
         "abc",
+        10,
         meter,
         peerRepository);
     peerService.start().join();
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index f4cd78b31..64ddadde3 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
@@ -79,8 +80,12 @@ public final class VertxRLPxService implements RLPxService {
   private final int connectTimeout = 5 * 1000;
   private final int idleTimeout = 30 * 1000;
 
+  private final int maxConnections;
+
   private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
   private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
+
+  private AtomicInteger connectionsCount = new AtomicInteger(0);
   private NetClient client;
   private NetServer server;
 
@@ -100,6 +105,7 @@ public final class VertxRLPxService implements RLPxService {
    * @param identityKeyPair the identity of this client
    * @param subProtocols subprotocols supported
    * @param clientId the client identifier, such as "RLPX 1.2/build 389"
+   * @param maxConnections the max number of client connections allowed
    * @param meter the metric service meter used to monitor useful metrics in the service
    */
   public VertxRLPxService(
@@ -110,6 +116,7 @@ public final class VertxRLPxService implements RLPxService {
       KeyPair identityKeyPair,
       List<SubProtocol> subProtocols,
       String clientId,
+      int maxConnections,
       @Nullable Meter meter) {
     this(
         vertx,
@@ -119,6 +126,7 @@ public final class VertxRLPxService implements RLPxService {
         identityKeyPair,
         subProtocols,
         clientId,
+        maxConnections,
         meter,
         new MemoryWireConnectionsRepository());
   }
@@ -133,6 +141,7 @@ public final class VertxRLPxService implements RLPxService {
    * @param identityKeyPair the identity of this client
    * @param subProtocols subprotocols supported
    * @param clientId the client identifier, such as "RLPX 1.2/build 389"
+   * @param maxConnections the max number of client connections allowed
    * @param meter the metric service meter used to monitor useful metrics in the service
    * @param repository a wire connection repository
    */
@@ -144,6 +153,7 @@ public final class VertxRLPxService implements RLPxService {
       KeyPair identityKeyPair,
       List<SubProtocol> subProtocols,
       String clientId,
+      int maxConnections,
       @Nullable Meter meter,
       WireConnectionRepository repository) {
     checkPort(listenPort);
@@ -151,6 +161,9 @@ public final class VertxRLPxService implements RLPxService {
     if (clientId == null || clientId.trim().isEmpty()) {
       throw new IllegalArgumentException("Client ID must contain a valid identifier");
     }
+    if (maxConnections <= 0) {
+      throw new IllegalArgumentException("Max connections must be a positive integer");
+    }
     this.vertx = vertx;
     this.listenPort = listenPort;
     this.networkInterface = networkInterface;
@@ -158,6 +171,7 @@ public final class VertxRLPxService implements RLPxService {
     this.keyPair = identityKeyPair;
     this.subProtocols = subProtocols;
     this.clientId = clientId;
+    this.maxConnections = maxConnections;
     this.repository = repository;
     repository.addDisconnectionListener(c -> {
       if (keepAliveList.contains(c.peerPublicKey())) {
@@ -258,11 +272,19 @@ public final class VertxRLPxService implements RLPxService {
     if (connectionsCreatedCounter != null) {
       connectionsCreatedCounter.add(1);
     }
+    int newCount = connectionsCount.getAndIncrement();
+
     netSocket.closeHandler((handler) -> {
       if (connectionsDisconnectedCounter != null) {
         connectionsDisconnectedCounter.add(1);
       }
+      connectionsCount.getAndDecrement();
     });
+    if (newCount > maxConnections) {
+      logger.info("disconnecting from incoming connection {} as over max connections", netSocket.remoteAddress());
+      netSocket.close();
+      return;
+    }
     netSocket.handler(new Handler<>() {
 
       private RLPxConnection conn;
@@ -357,6 +379,12 @@ public final class VertxRLPxService implements RLPxService {
     }
 
     CompletableAsyncResult<WireConnection> connected = AsyncResult.incomplete();
+    if (connectionsCount.get() > maxConnections) {
+      logger
+          .info("Cancelling connection to {} with public key {}, max connections reached", peerAddress, peerPublicKey);
+      connected.cancel();
+      return connected;
+    }
     logger.info("Connecting to {} with public key {}", peerAddress, peerPublicKey);
     client
         .connect(
@@ -366,6 +394,7 @@ public final class VertxRLPxService implements RLPxService {
               if (connectionsCreatedCounter != null) {
                 connectionsCreatedCounter.add(1);
               }
+              connectionsCount.getAndIncrement();
               Bytes32 nonce = RLPxConnectionFactory.generateRandomBytes32();
               KeyPair ephemeralKeyPair = KeyPair.random();
               Bytes initHandshakeMessage = RLPxConnectionFactory.init(keyPair, peerPublicKey, ephemeralKeyPair, nonce);
@@ -380,6 +409,7 @@ public final class VertxRLPxService implements RLPxService {
                 if (!connected.isDone()) {
                   connected.cancel();
                 }
+                connectionsCount.getAndDecrement();
               });
 
               netSocket.handler(new Handler<>() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org