You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2022/01/19 07:01:15 UTC

[incubator-tuweni] branch main updated: Fix a series of issues blocking 2 clients from talking to each other

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 09abe88  Fix a series of issues blocking 2 clients from talking to each other
     new ea2f814  Merge pull request #365 from atoulme/connect_two_ethereum_clients
09abe88 is described below

commit 09abe88d5fe8343f780ea03459f4f1cb848aaac5
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Mon Jan 17 23:38:20 2022 -0800

    Fix a series of issues blocking 2 clients from talking to each other
---
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     |  4 ++++
 .../org/apache/tuweni/devp2p/eth/EthHandler66.kt   |  6 +++++-
 .../org/apache/tuweni/devp2p/eth/EthHandlerTest.kt | 13 +++++++++++
 .../tuweni/ethclient/EthereumClientRunTest.kt      | 25 +++++++++++++---------
 .../org/apache/tuweni/ethclient/EthereumClient.kt  | 14 ++++++------
 .../tuweni/ethclient/EthereumPeerRepository.kt     |  9 ++++----
 .../org/apache/tuweni/ethclient/Synchronizer.kt    |  4 +++-
 .../WireConnectionPeerRepositoryAdapter.kt         |  5 +++--
 .../java/org/apache/tuweni/rlp/BytesRLPReader.java |  2 +-
 .../org/apache/tuweni/rlp/BytesRLPReaderTest.java  |  4 ++--
 .../tuweni/rlpx/wire/DefaultWireConnection.java    | 10 ++++-----
 11 files changed, 63 insertions(+), 33 deletions(-)

diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 2169599..9290232 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -197,6 +197,10 @@ internal class EthHandler(
   }
 
   private suspend fun handleGetBlockBodies(connection: WireConnection, message: GetBlockBodies) {
+    if (message.hashes.isEmpty()) {
+      service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+      return
+    }
     service.send(
       connection.agreedSubprotocolVersion(ETH62.name()),
       MessageType.BlockBodies.code,
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
index 05bfcef..c45ad6b 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
@@ -56,7 +56,7 @@ internal class EthHandler66(
     }
     return asyncCompletion {
       logger.debug("Receiving message of type {}", messageType)
-      val pair = RLP.decode(message) {
+      val pair = RLP.decodeList(message) {
         Pair(it.readValue(), it.readRemaining())
       }
       val requestIdentifier = pair.first
@@ -232,6 +232,10 @@ internal class EthHandler66(
   }
 
   private suspend fun handleGetBlockBodies(connection: WireConnection, requestIdentifier: Bytes, message: GetBlockBodies) {
+    if (message.hashes.isEmpty()) {
+      service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+      return
+    }
     val bodies = BlockBodies(controller.findBlockBodies(message.hashes))
     service.send(
       EthSubprotocol.ETH66,
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
index 3351244..adfac50 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
@@ -42,6 +42,7 @@ import org.apache.tuweni.junit.VertxExtension
 import org.apache.tuweni.kv.MapKeyValueStore
 import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.DisconnectReason
 import org.apache.tuweni.rlpx.wire.WireConnection
 import org.apache.tuweni.units.bigints.UInt256
 import org.apache.tuweni.units.bigints.UInt64
@@ -278,4 +279,16 @@ class EthHandlerTest {
     assertEquals(1, messageRead.transactionReceipts.size)
     assertEquals(txReceipt, messageRead.transactionReceipts[0][0])
   }
+
+  @Test
+  fun testGetBodiesEmpty(): Unit = runBlocking {
+    val conn = mock(WireConnection::class.java)
+    handler.handle(
+      conn,
+      MessageType.GetBlockBodies.code,
+      GetBlockBodies(listOf()).toBytes()
+    ).await()
+
+    verify(service).disconnect(conn, DisconnectReason.SUBPROTOCOL_REASON)
+  }
 }
diff --git a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
index 3c35046..8998f36 100644
--- a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
+++ b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt
@@ -19,10 +19,14 @@ package org.apache.tuweni.ethclient
 import io.vertx.core.Vertx
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.apache.tuweni.junit.VertxExtension
 import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotNull
 import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
@@ -37,16 +41,11 @@ class EthereumClientRunTest {
       """
       [peerRepository.default]
       type="memory"
-      [metrics]
-      networkInterface="127.0.0.1"
-      port=9091
       [storage.default]
       path="data"
       genesis="default"
       [genesis.default]
       path="classpath:/default.json"
-      [static.default]
-      peerRepository="default"
       [rlpx.default]
       networkInterface="127.0.0.1"
       port=30301
@@ -57,9 +56,6 @@ class EthereumClientRunTest {
       """
       [peerRepository.default]
       type="memory"
-      [metrics]
-      networkInterface="127.0.0.1"
-      port=9092
       [storage.default]
       path="data2"
       genesis="default"
@@ -68,14 +64,23 @@ class EthereumClientRunTest {
       [static.default]
       enodes=["enode://${keyPair.publicKey().toHexString()}@127.0.0.1:30301"]
       peerRepository="default"
+      [rlpx.default]
+      networkInterface="127.0.0.1"
+      port=30304
+      key="${SECP256K1.KeyPair.random().secretKey().bytes().toHexString()}"
       """.trimMargin()
     )
     val client1 = EthereumClient(vertx, config1)
     val client2 = EthereumClient(vertx, config2)
     client1.start()
+    val connectionInfo = AsyncResult.incomplete<EthereumConnection>()
+    client1.peerRepositories["default"]!!.addStatusListener { connectionInfo.complete(it) }
     client2.start()
-    delay(1000)
-    // TODO make sure the connection happens!
+
+    val conn = connectionInfo.await()
+    assertNotNull(conn)
+    assertNotNull(conn.status())
+    assertEquals(client2.config.rlpxServices()[0].keyPair().publicKey(), conn.peer().id().publicKey())
     client1.stop()
     client2.stop()
   }
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 8544dcb..9f1a958 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -244,6 +244,12 @@ class EthereumClient(
           adapter
         )
         services[rlpxConfig.getName()] = service
+        peerRepository.addIdentityListener {
+          service.connectTo(
+            it.publicKey(),
+            InetSocketAddress(it.networkInterface(), it.port())
+          )
+        }
         service.start().thenRun {
           logger.info("Started Ethereum client ${rlpxConfig.getName()}")
           val proxyClient = service.getClient(ProxySubprotocol.ID) as ProxyClient
@@ -265,12 +271,7 @@ class EthereumClient(
               }
             }
           }
-          peerRepository.addIdentityListener {
-            service.connectTo(
-              it.publicKey(),
-              InetSocketAddress(it.networkInterface(), it.port())
-            )
-          }
+
           val synchronizer = PeerStatusEthSynchronizer(
             repository = repository,
             client = service.getClient(ETH66) as EthRequestsManager,
@@ -293,6 +294,7 @@ class EthereumClient(
           )
           synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
           bestSynchronizer.start()
+          logger.info("Finished configuring Ethereum client ${rlpxConfig.getName()}")
         }
       }
     ).await()
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
index edca79b..f680ec8 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt
@@ -33,10 +33,10 @@ import java.util.stream.Stream
 interface EthereumPeerRepository : PeerRepository {
   /**
    * Stores the status message sent for a connection
-   * @param connId the ID of the connection
+   * @param peerIdentity the peer identity
    * @param status the status message
    */
-  fun storeStatus(connId: String, status: Status)
+  fun storeStatus(peerIdentity: Identity, status: Status)
 
   /**
    * Provides a stream of active connections.
@@ -87,8 +87,9 @@ class MemoryEthereumPeerRepository : EthereumPeerRepository {
   val statusListeners = HashMap<String, (EthereumConnection) -> Unit>()
   val identityListeners = HashMap<String, (Identity) -> Unit>()
 
-  override fun storeStatus(connId: String, status: Status) {
-    connections[connId]?.let { conn ->
+  override fun storeStatus(peerIdentity: Identity, status: Status) {
+    val connKey = peerMap[peerIdentity]?.let { createConnectionKey(it, peerIdentity) }
+    connections[connKey]?.let { conn ->
       (conn as MemoryEthereumConnection).status = status
       statusListeners.values.forEach {
         it(conn)
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index 3254e16..e0db77d 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -54,7 +54,9 @@ abstract class Synchronizer(
           }
         }
       }.awaitAll()
-      client.requestBlockBodies(bodiesToRequest)
+      if (!bodiesToRequest.isEmpty()) {
+        client.requestBlockBodies(bodiesToRequest)
+      }
     }
   }
 }
diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
index c50c902..ccb0112 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt
@@ -17,6 +17,7 @@
 package org.apache.tuweni.ethclient
 
 import org.apache.tuweni.devp2p.eth.Status
+import org.apache.tuweni.peer.repository.Identity
 import org.apache.tuweni.rlpx.WireConnectionRepository
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
 import org.apache.tuweni.rlpx.wire.WireConnection
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap
  */
 class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerRepository) : WireConnectionRepository {
 
-  private val wireConnectionToIdentities = ConcurrentHashMap<String, String>()
+  private val wireConnectionToIdentities = ConcurrentHashMap<String, Identity>()
   private val connections = ConcurrentHashMap<String, WireConnection>()
   private val connectionListeners = ArrayList<WireConnectionRepository.Listener>()
 
@@ -49,7 +50,7 @@ class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerReposi
     val peer = peerRepository.storePeer(id, Instant.now(), Instant.now())
     peerRepository.addConnection(peer, id)
     connections[id.id()] = wireConnection
-    wireConnectionToIdentities[wireConnection.uri()] = id.id()
+    wireConnectionToIdentities[wireConnection.uri()] = peer.id()
     wireConnection.registerListener {
       if (it == WireConnection.Event.CONNECTED) {
         for (listener in connectionListeners) {
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
index 8a4bec0..415df60 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
@@ -38,7 +38,7 @@ final class BytesRLPReader implements RLPReader {
     if (remaining == 0) {
       return Bytes.EMPTY;
     }
-    return content.slice(remaining);
+    return content.slice(index++, remaining);
   }
 
   @Override
diff --git a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
index 6b0d245..b16f9e5 100644
--- a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
+++ b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
@@ -222,11 +222,11 @@ class BytesRLPReaderTest {
 
   @Test
   void shouldReadRemaining() {
-    Bytes input = Bytes.fromHexString("83646f6783646f67");
+    Bytes input = Bytes.fromHexString("83646f6783646f6783646f67");
     RLP.decode(input, reader -> {
       reader.readValue();
       assertEquals(4, reader.position());
-      assertEquals(Bytes.fromHexString("83646f67"), reader.readRemaining());
+      assertEquals(Bytes.fromHexString("83646f6783646f67"), reader.readRemaining());
       return null;
     });
   }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index ebae524..73e1f8b 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -205,11 +205,9 @@ public final class DefaultWireConnection implements WireConnection {
         int offset = subProtocolEntry.getKey().lowerEndpoint();
         logger.trace("Received message of type {}", message.messageId() - offset);
         SubProtocolHandler handler = subprotocols.get(subProtocolEntry.getValue());
-        try {
-          handler.handle(this, message.messageId() - offset, message.content());
-        } catch (Throwable t) {
-          logger.error("Handler " + handler.toString() + " threw an exception", t);
-        }
+        handler
+            .handle(this, message.messageId() - offset, message.content())
+            .exceptionally(t -> logger.error("Handler " + handler.toString() + " threw an exception", t));
       }
     }
   }
@@ -263,7 +261,7 @@ public final class DefaultWireConnection implements WireConnection {
 
   /**
    * Sends a ping message to the remote peer.
-   * 
+   *
    * @return a handler marking completion when a pong response is received
    */
   public AsyncCompletion sendPing() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org