You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/29 02:33:58 UTC
[incubator-tuweni] branch main updated: Allow to configure max clients of rlpx service
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 0235f6024 Allow to configure max clients of rlpx service
new 8fa509038 Merge pull request #497 from atoulme/max_clients
0235f6024 is described below
commit 0235f6024d8843ab4577bd21c0c821ca9b79387a
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jan 28 18:07:05 2023 -0800
Allow to configure max clients of rlpx service
---
.../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt | 7 ++-
.../devp2p/eth/SendPendingTransactionsTest.kt | 3 +-
.../devp2p/proxy/SendDataToAnotherNodeTest.kt | 2 +
.../org/apache/tuweni/ethclient/EthereumClient.kt | 1 +
.../tuweni/ethclient/EthereumClientConfig.kt | 10 ++++
.../org/apache/tuweni/eth/crawler/CrawlerApp.kt | 1 +
.../tuweni/rlpx/vertx/VertxAcceptanceTest.java | 6 ++-
.../tuweni/rlpx/vertx/VertxRLPxServiceTest.java | 61 ++++++++++++++++++----
.../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 30 +++++++++++
9 files changed, 106 insertions(+), 15 deletions(-)
diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index 68065939c..815d747ab 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -53,7 +53,7 @@ class ConnectToAnotherNodeTest {
@Disabled
@Test
fun testCollectHeaders(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
- val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
+ val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()
@@ -88,6 +88,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
+ 10,
meter
)
service.start().await()
@@ -116,7 +117,7 @@ class ConnectToAnotherNodeTest {
@Disabled("flaky")
@Test
fun twoServers(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
- val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
+ val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisBlock = GenesisFile.read(contents).toBlock()
val repository = BlockchainRepository.init(
@@ -150,6 +151,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
+ 10,
meter
)
@@ -185,6 +187,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
+ 10,
meter
)
val result = AsyncCompletion.allOf(service.start(), service2.start()).then {
diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
index 452ba4bc8..82596c56f 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/SendPendingTransactionsTest.kt
@@ -62,7 +62,7 @@ class SendPendingTransactionsTest {
@Test
fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
- val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
+ val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()
@@ -97,6 +97,7 @@ class SendPendingTransactionsTest {
)
),
"Tuweni Experiment 0.1",
+ 10,
meter
)
service.start().await()
diff --git a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
index 5fe00bee6..d2e824538 100644
--- a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
+++ b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
@@ -54,6 +54,7 @@ class SendDataToAnotherNodeTest {
ProxySubprotocol()
),
"Tuweni Experiment 0.1",
+ 10,
meter
)
@@ -66,6 +67,7 @@ class SendDataToAnotherNodeTest {
service2kp,
listOf(ProxySubprotocol()),
"Tuweni Experiment 0.1",
+ 10,
meter
)
val recorder = RecordingClientHandler()
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 6267e9ebb..792e3d9ea 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -272,6 +272,7 @@ class EthereumClient(
rlpxConfig.keyPair(),
listOf(ethSubprotocol, proxySubprotocol),
rlpxConfig.clientName(),
+ rlpxConfig.maxConnections(),
meter,
adapter
)
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
index aead7ad2d..fcf4202fa 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
@@ -66,6 +66,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
RLPxServiceConfigurationImpl(
section,
sectionConfig.getString("clientName"),
+ sectionConfig.getInteger("maxConnections"),
sectionConfig.getInteger("port"),
sectionConfig.getString("networkInterface"),
sectionConfig.getInteger("advertisedPort"),
@@ -337,6 +338,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
"Port to advertise in communications as the RLPx service port",
PropertyValidator.isValidPort()
)
+ rlpx.addInteger(
+ "maxConnections",
+ 50,
+ "Maximum number of clients",
+ PropertyValidator.isGreaterOrEqual(1)
+ )
rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum client", null)
rlpx.addString("repository", "default", "Name of the blockchain repository", null)
rlpx.addString("peerRepository", "default", "Peer repository to which records should go", null)
@@ -430,6 +437,7 @@ interface RLPxServiceConfiguration {
fun repository(): String
fun getName(): String
fun clientName(): String
+ fun maxConnections(): Int
fun peerRepository(): String
}
@@ -501,6 +509,7 @@ internal class PeerRepositoryConfigurationImpl(private val repoName: String, pri
internal data class RLPxServiceConfigurationImpl(
private val name: String,
val clientName: String,
+ val maxConnections: Int,
val port: Int,
val networkInterface: String,
val advertisedPort: Int,
@@ -524,6 +533,7 @@ internal data class RLPxServiceConfigurationImpl(
override fun getName(): String = name
override fun clientName(): String = clientName
+ override fun maxConnections(): Int = maxConnections
override fun peerRepository(): String = peerRepository
}
diff --git a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
index 33e300297..f05fc1376 100644
--- a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
+++ b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
@@ -193,6 +193,7 @@ class CrawlerApplication(
SECP256K1.KeyPair.random(),
listOf(ethHelloProtocol),
"Apache Tuweni network crawler",
+ 50,
meter,
wireConnectionsRepository
)
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
index 920203943..28ecdf92e 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
@@ -125,6 +125,7 @@ class VertxAcceptanceTest {
kp,
Collections.singletonList(sp),
"Client 1",
+ 10,
meter,
repository);
MemoryWireConnectionsRepository secondRepository = new MemoryWireConnectionsRepository();
@@ -137,6 +138,7 @@ class VertxAcceptanceTest {
secondKp,
Collections.singletonList(secondSp),
"Client 2",
+ 10,
meter,
secondRepository);
service.start().join();
@@ -180,6 +182,7 @@ class VertxAcceptanceTest {
kp,
Collections.singletonList(sp),
"Client 1",
+ 10,
meter,
repository);
VertxRLPxService secondService = new VertxRLPxService(
@@ -190,6 +193,7 @@ class VertxAcceptanceTest {
secondKp,
Collections.singletonList(secondSp),
"Client 2",
+ 10,
meter,
secondRepository);
service.start().join();
@@ -282,7 +286,7 @@ class VertxAcceptanceTest {
public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier identifier) {
return null;
}
- }), "Client 1", meter, repository);
+ }), "Client 1", 10, meter, repository);
service.start().join();
AsyncResult<WireConnection> completion = service
diff --git a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index a59b5910b..754ac76db 100644
--- a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++ b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -65,6 +65,7 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
+ 10,
meter));
}
@@ -80,6 +81,7 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
+ 10,
meter));
}
@@ -95,6 +97,23 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
null,
+ 10,
+ meter));
+ }
+
+ @Test
+ void invalidMaxConnections(@VertxInstance Vertx vertx) {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> new VertxRLPxService(
+ vertx,
+ 34,
+ "localhost",
+ 23,
+ SECP256K1.KeyPair.random(),
+ new ArrayList<>(),
+ "foo",
+ -1,
meter));
}
@@ -110,6 +129,7 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
" ",
+ 10,
meter));
}
@@ -123,6 +143,7 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
+ 10,
meter);
service.start().join();
@@ -136,7 +157,7 @@ class VertxRLPxServiceTest {
@Test
void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedException {
VertxRLPxService service =
- new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", meter);
+ new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", 10, meter);
service.start().join();
try {
@@ -149,8 +170,16 @@ class VertxRLPxServiceTest {
@Test
void stopServiceWithoutStartingItFirst(@VertxInstance Vertx vertx) {
- VertxRLPxService service =
- new VertxRLPxService(vertx, 0, "localhost", 10000, SECP256K1.KeyPair.random(), new ArrayList<>(), "abc", meter);
+ VertxRLPxService service = new VertxRLPxService(
+ vertx,
+ 0,
+ "localhost",
+ 10000,
+ SECP256K1.KeyPair.random(),
+ new ArrayList<>(),
+ "abc",
+ 10,
+ meter);
AsyncCompletion completion = service.stop();
assertTrue(completion.isDone());
}
@@ -160,11 +189,11 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
VertxRLPxService service =
- new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", meter);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", 10, meter);
service.start().join();
VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", 10, meter);
peerService.start().join();
WireConnection conn =
@@ -179,12 +208,12 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
List<SubProtocol> protocols = Arrays.asList(new VertxAcceptanceTest.MyCustomSubProtocol());
- VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter);
+ VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter);
service.start().join();
VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter);
peerService.start().join();
WireConnection conn = null;
@@ -234,12 +263,12 @@ class VertxRLPxServiceTest {
}
});
VertxRLPxService service =
- new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter, repository);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter, repository);
service.start().join();
MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter, peerRepository);
+ new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter, peerRepository);
peerService.start().join();
try {
@@ -263,8 +292,17 @@ class VertxRLPxServiceTest {
void getClientWhenNotReady(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
- VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter, peerRepository);
+ VertxRLPxService peerService = new VertxRLPxService(
+ vertx,
+ 0,
+ "localhost",
+ 10000,
+ peerPair,
+ new ArrayList<>(),
+ "abc",
+ 10,
+ meter,
+ peerRepository);
assertThrows(IllegalStateException.class, () -> {
peerService.getClient(SubProtocolIdentifier.of("foo", 1));
});
@@ -286,6 +324,7 @@ class VertxRLPxServiceTest {
SECP256K1.KeyPair.random(),
Collections.singletonList(sp),
"abc",
+ 10,
meter,
peerRepository);
peerService.start().join();
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index f4cd78b31..64ddadde3 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -79,8 +80,12 @@ public final class VertxRLPxService implements RLPxService {
private final int connectTimeout = 5 * 1000;
private final int idleTimeout = 30 * 1000;
+ private final int maxConnections;
+
private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
+
+ private AtomicInteger connectionsCount = new AtomicInteger(0);
private NetClient client;
private NetServer server;
@@ -100,6 +105,7 @@ public final class VertxRLPxService implements RLPxService {
* @param identityKeyPair the identity of this client
* @param subProtocols subprotocols supported
* @param clientId the client identifier, such as "RLPX 1.2/build 389"
+ * @param maxConnections the max number of client connections allowed
* @param meter the metric service meter used to monitor useful metrics in the service
*/
public VertxRLPxService(
@@ -110,6 +116,7 @@ public final class VertxRLPxService implements RLPxService {
KeyPair identityKeyPair,
List<SubProtocol> subProtocols,
String clientId,
+ int maxConnections,
@Nullable Meter meter) {
this(
vertx,
@@ -119,6 +126,7 @@ public final class VertxRLPxService implements RLPxService {
identityKeyPair,
subProtocols,
clientId,
+ maxConnections,
meter,
new MemoryWireConnectionsRepository());
}
@@ -133,6 +141,7 @@ public final class VertxRLPxService implements RLPxService {
* @param identityKeyPair the identity of this client
* @param subProtocols subprotocols supported
* @param clientId the client identifier, such as "RLPX 1.2/build 389"
+ * @param maxConnections the max number of client connections allowed
* @param meter the metric service meter used to monitor useful metrics in the service
* @param repository a wire connection repository
*/
@@ -144,6 +153,7 @@ public final class VertxRLPxService implements RLPxService {
KeyPair identityKeyPair,
List<SubProtocol> subProtocols,
String clientId,
+ int maxConnections,
@Nullable Meter meter,
WireConnectionRepository repository) {
checkPort(listenPort);
@@ -151,6 +161,9 @@ public final class VertxRLPxService implements RLPxService {
if (clientId == null || clientId.trim().isEmpty()) {
throw new IllegalArgumentException("Client ID must contain a valid identifier");
}
+ if (maxConnections <= 0) {
+ throw new IllegalArgumentException("Max connections must be a positive integer");
+ }
this.vertx = vertx;
this.listenPort = listenPort;
this.networkInterface = networkInterface;
@@ -158,6 +171,7 @@ public final class VertxRLPxService implements RLPxService {
this.keyPair = identityKeyPair;
this.subProtocols = subProtocols;
this.clientId = clientId;
+ this.maxConnections = maxConnections;
this.repository = repository;
repository.addDisconnectionListener(c -> {
if (keepAliveList.contains(c.peerPublicKey())) {
@@ -258,11 +272,19 @@ public final class VertxRLPxService implements RLPxService {
if (connectionsCreatedCounter != null) {
connectionsCreatedCounter.add(1);
}
+ int newCount = connectionsCount.getAndIncrement();
+
netSocket.closeHandler((handler) -> {
if (connectionsDisconnectedCounter != null) {
connectionsDisconnectedCounter.add(1);
}
+ connectionsCount.getAndDecrement();
});
+ if (newCount > maxConnections) {
+ logger.info("disconnecting from incoming connection {} as over max connections", netSocket.remoteAddress());
+ netSocket.close();
+ return;
+ }
netSocket.handler(new Handler<>() {
private RLPxConnection conn;
@@ -357,6 +379,12 @@ public final class VertxRLPxService implements RLPxService {
}
CompletableAsyncResult<WireConnection> connected = AsyncResult.incomplete();
+ if (connectionsCount.get() > maxConnections) {
+ logger
+ .info("Cancelling connection to {} with public key {}, max connections reached", peerAddress, peerPublicKey);
+ connected.cancel();
+ return connected;
+ }
logger.info("Connecting to {} with public key {}", peerAddress, peerPublicKey);
client
.connect(
@@ -366,6 +394,7 @@ public final class VertxRLPxService implements RLPxService {
if (connectionsCreatedCounter != null) {
connectionsCreatedCounter.add(1);
}
+ connectionsCount.getAndIncrement();
Bytes32 nonce = RLPxConnectionFactory.generateRandomBytes32();
KeyPair ephemeralKeyPair = KeyPair.random();
Bytes initHandshakeMessage = RLPxConnectionFactory.init(keyPair, peerPublicKey, ephemeralKeyPair, nonce);
@@ -380,6 +409,7 @@ public final class VertxRLPxService implements RLPxService {
if (!connected.isDone()) {
connected.cancel();
}
+ connectionsCount.getAndDecrement();
});
netSocket.handler(new Handler<>() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org