You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2022/01/19 07:01:15 UTC
[incubator-tuweni] branch main updated: Fix a series of issues blocking 2 clients from talking to each other
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 09abe88 Fix a series of issues blocking 2 clients from talking to each other
new ea2f814 Merge pull request #365 from atoulme/connect_two_ethereum_clients
09abe88 is described below
commit 09abe88d5fe8343f780ea03459f4f1cb848aaac5
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Mon Jan 17 23:38:20 2022 -0800
Fix a series of issues blocking 2 clients from talking to each other
---
.../org/apache/tuweni/devp2p/eth/EthHandler.kt | 4 ++++
.../org/apache/tuweni/devp2p/eth/EthHandler66.kt | 6 +++++-
.../org/apache/tuweni/devp2p/eth/EthHandlerTest.kt | 13 +++++++++++
.../tuweni/ethclient/EthereumClientRunTest.kt | 25 +++++++++++++---------
.../org/apache/tuweni/ethclient/EthereumClient.kt | 14 ++++++------
.../tuweni/ethclient/EthereumPeerRepository.kt | 9 ++++----
.../org/apache/tuweni/ethclient/Synchronizer.kt | 4 +++-
.../WireConnectionPeerRepositoryAdapter.kt | 5 +++--
.../java/org/apache/tuweni/rlp/BytesRLPReader.java | 2 +-
.../org/apache/tuweni/rlp/BytesRLPReaderTest.java | 4 ++--
.../tuweni/rlpx/wire/DefaultWireConnection.java | 10 ++++-----
11 files changed, 63 insertions(+), 33 deletions(-)
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 2169599..9290232 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -197,6 +197,10 @@ internal class EthHandler(
}
private suspend fun handleGetBlockBodies(connection: WireConnection, message: GetBlockBodies) {
+ if (message.hashes.isEmpty()) {
+ service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+ return
+ }
service.send(
connection.agreedSubprotocolVersion(ETH62.name()),
MessageType.BlockBodies.code,
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
index 05bfcef..c45ad6b 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
@@ -56,7 +56,7 @@ internal class EthHandler66(
}
return asyncCompletion {
logger.debug("Receiving message of type {}", messageType)
- val pair = RLP.decode(message) {
+ val pair = RLP.decodeList(message) {
Pair(it.readValue(), it.readRemaining())
}
val requestIdentifier = pair.first
@@ -232,6 +232,10 @@ internal class EthHandler66(
}
private suspend fun handleGetBlockBodies(connection: WireConnection, requestIdentifier: Bytes, message: GetBlockBodies) {
+ if (message.hashes.isEmpty()) {
+ service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+ return
+ }
val bodies = BlockBodies(controller.findBlockBodies(message.hashes))
service.send(
EthSubprotocol.ETH66,
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
index 3351244..adfac50 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
@@ -42,6 +42,7 @@ import org.apache.tuweni.junit.VertxExtension
import org.apache.tuweni.kv.MapKeyValueStore
import org.apache.tuweni.rlp.RLP
import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.DisconnectReason
import org.apache.tuweni.rlpx.wire.WireConnection
import org.apache.tuweni.units.bigints.UInt256
import org.apache.tuweni.units.bigints.UInt64
@@ -278,4 +279,16 @@ class EthHandlerTest {
assertEquals(1, messageRead.transactionReceipts.size)
assertEquals(txReceipt, messageRead.transactionReceipts[0][0])
}
+
+ @Test
+ fun testGetBodiesEmpty(): Unit = runBlocking {
+ val conn = mock(WireConnection::class.java)
+ handler.handle(
+ conn,
+ MessageType.GetBlockBodies.code,
+ GetBlockBodies(listOf()).toBytes()
+ ).await()
+
+ verify(service).disconnect(conn, DisconnectReason.SUBPROTOCOL_REASON)
+ }
}
diff --git a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
index 3c35046..8998f36 100644
--- a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
+++ b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
@@ -19,10 +19,14 @@ package org.apache.tuweni.ethclient
import io.vertx.core.Vertx
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.junit.BouncyCastleExtension
import org.apache.tuweni.junit.VertxExtension
import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@@ -37,16 +41,11 @@ class EthereumClientRunTest {
"""
[peerRepository.default]
type="memory"
- [metrics]
- networkInterface="127.0.0.1"
- port=9091
[storage.default]
path="data"
genesis="default"
[genesis.default]
path="classpath:/default.json"
- [static.default]
- peerRepository="default"
[rlpx.default]
networkInterface="127.0.0.1"
port=30301
@@ -57,9 +56,6 @@ class EthereumClientRunTest {
"""
[peerRepository.default]
type="memory"
- [metrics]
- networkInterface="127.0.0.1"
- port=9092
[storage.default]
path="data2"
genesis="default"
@@ -68,14 +64,23 @@ class EthereumClientRunTest {
[static.default]
enodes=["enode://${keyPair.publicKey().toHexString()}@127.0.0.1:30301"]
peerRepository="default"
+ [rlpx.default]
+ networkInterface="127.0.0.1"
+ port=30304
+ key="${SECP256K1.KeyPair.random().secretKey().bytes().toHexString()}"
""".trimMargin()
)
val client1 = EthereumClient(vertx, config1)
val client2 = EthereumClient(vertx, config2)
client1.start()
+ val connectionInfo = AsyncResult.incomplete<EthereumConnection>()
+ client1.peerRepositories["default"]!!.addStatusListener { connectionInfo.complete(it) }
client2.start()
- delay(1000)
- // TODO make sure the connection happens!
+
+ val conn = connectionInfo.await()
+ assertNotNull(conn)
+ assertNotNull(conn.status())
+ assertEquals(client2.config.rlpxServices()[0].keyPair().publicKey(), conn.peer().id().publicKey())
client1.stop()
client2.stop()
}
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 8544dcb..9f1a958 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -244,6 +244,12 @@ class EthereumClient(
adapter
)
services[rlpxConfig.getName()] = service
+ peerRepository.addIdentityListener {
+ service.connectTo(
+ it.publicKey(),
+ InetSocketAddress(it.networkInterface(), it.port())
+ )
+ }
service.start().thenRun {
logger.info("Started Ethereum client ${rlpxConfig.getName()}")
val proxyClient = service.getClient(ProxySubprotocol.ID) as ProxyClient
@@ -265,12 +271,7 @@ class EthereumClient(
}
}
}
- peerRepository.addIdentityListener {
- service.connectTo(
- it.publicKey(),
- InetSocketAddress(it.networkInterface(), it.port())
- )
- }
+
val synchronizer = PeerStatusEthSynchronizer(
repository = repository,
client = service.getClient(ETH66) as EthRequestsManager,
@@ -293,6 +294,7 @@ class EthereumClient(
)
synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
bestSynchronizer.start()
+ logger.info("Finished configuring Ethereum client ${rlpxConfig.getName()}")
}
}
).await()
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
index edca79b..f680ec8 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
@@ -33,10 +33,10 @@ import java.util.stream.Stream
interface EthereumPeerRepository : PeerRepository {
/**
* Stores the status message sent for a connection
- * @param connId the ID of the connection
+ * @param peerIdentity the peer identity
* @param status the status message
*/
- fun storeStatus(connId: String, status: Status)
+ fun storeStatus(peerIdentity: Identity, status: Status)
/**
* Provides a stream of active connections.
@@ -87,8 +87,9 @@ class MemoryEthereumPeerRepository : EthereumPeerRepository {
val statusListeners = HashMap<String, (EthereumConnection) -> Unit>()
val identityListeners = HashMap<String, (Identity) -> Unit>()
- override fun storeStatus(connId: String, status: Status) {
- connections[connId]?.let { conn ->
+ override fun storeStatus(peerIdentity: Identity, status: Status) {
+ val connKey = peerMap[peerIdentity]?.let { createConnectionKey(it, peerIdentity) }
+ connections[connKey]?.let { conn ->
(conn as MemoryEthereumConnection).status = status
statusListeners.values.forEach {
it(conn)
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index 3254e16..e0db77d 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -54,7 +54,9 @@ abstract class Synchronizer(
}
}
}.awaitAll()
- client.requestBlockBodies(bodiesToRequest)
+ if (!bodiesToRequest.isEmpty()) {
+ client.requestBlockBodies(bodiesToRequest)
+ }
}
}
}
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
index c50c902..ccb0112 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
@@ -17,6 +17,7 @@
package org.apache.tuweni.ethclient
import org.apache.tuweni.devp2p.eth.Status
+import org.apache.tuweni.peer.repository.Identity
import org.apache.tuweni.rlpx.WireConnectionRepository
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
import org.apache.tuweni.rlpx.wire.WireConnection
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap
*/
class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerRepository) : WireConnectionRepository {
- private val wireConnectionToIdentities = ConcurrentHashMap<String, String>()
+ private val wireConnectionToIdentities = ConcurrentHashMap<String, Identity>()
private val connections = ConcurrentHashMap<String, WireConnection>()
private val connectionListeners = ArrayList<WireConnectionRepository.Listener>()
@@ -49,7 +50,7 @@ class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerReposi
val peer = peerRepository.storePeer(id, Instant.now(), Instant.now())
peerRepository.addConnection(peer, id)
connections[id.id()] = wireConnection
- wireConnectionToIdentities[wireConnection.uri()] = id.id()
+ wireConnectionToIdentities[wireConnection.uri()] = peer.id()
wireConnection.registerListener {
if (it == WireConnection.Event.CONNECTED) {
for (listener in connectionListeners) {
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
index 8a4bec0..415df60 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
@@ -38,7 +38,7 @@ final class BytesRLPReader implements RLPReader {
if (remaining == 0) {
return Bytes.EMPTY;
}
- return content.slice(remaining);
+ return content.slice(index++, remaining);
}
@Override
diff --git a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
index 6b0d245..b16f9e5 100644
--- a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
+++ b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
@@ -222,11 +222,11 @@ class BytesRLPReaderTest {
@Test
void shouldReadRemaining() {
- Bytes input = Bytes.fromHexString("83646f6783646f67");
+ Bytes input = Bytes.fromHexString("83646f6783646f6783646f67");
RLP.decode(input, reader -> {
reader.readValue();
assertEquals(4, reader.position());
- assertEquals(Bytes.fromHexString("83646f67"), reader.readRemaining());
+ assertEquals(Bytes.fromHexString("83646f6783646f67"), reader.readRemaining());
return null;
});
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index ebae524..73e1f8b 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -205,11 +205,9 @@ public final class DefaultWireConnection implements WireConnection {
int offset = subProtocolEntry.getKey().lowerEndpoint();
logger.trace("Received message of type {}", message.messageId() - offset);
SubProtocolHandler handler = subprotocols.get(subProtocolEntry.getValue());
- try {
- handler.handle(this, message.messageId() - offset, message.content());
- } catch (Throwable t) {
- logger.error("Handler " + handler.toString() + " threw an exception", t);
- }
+ handler
+ .handle(this, message.messageId() - offset, message.content())
+ .exceptionally(t -> logger.error("Handler " + handler.toString() + " threw an exception", t));
}
}
}
@@ -263,7 +261,7 @@ public final class DefaultWireConnection implements WireConnection {
/**
* Sends a ping message to the remote peer.
- *
+ *
* @return a handler marking completion when a pong response is received
*/
public AsyncCompletion sendPing() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org