You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/04 07:38:56 UTC

[incubator-tuweni] branch master updated: Expose client functionality through EthClient

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a30392  Expose client functionality through EthClient
7a30392 is described below

commit 7a303923d4e3ea215dbe2add830ef4dc31460165
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Jun 4 00:38:43 2020 -0700

    Expose client functionality through EthClient
---
 .../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt  |  16 ++-
 .../org/apache/tuweni/devp2p/eth/EthClient.kt      | 115 +++++++++++++++++++++
 .../org/apache/tuweni/devp2p/eth/EthController.kt  |  17 ++-
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     |  14 +--
 .../apache/tuweni/devp2p/eth/EthRequestsManager.kt |  12 ++-
 .../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt |  18 +++-
 .../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt |  10 +-
 eth-repository/build.gradle                        |   1 +
 .../tuweni/eth/repository/BlockchainRepository.kt  |   3 +
 .../kotlin/org/apache/tuweni/les/LESSubprotocol.kt |   5 +
 .../rlpx/MemoryWireConnectionsRepository.java      |   7 ++
 .../java/org/apache/tuweni/rlpx/RLPxService.java   |   9 ++
 .../tuweni/rlpx/WireConnectionRepository.java      |   8 ++
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java |  11 ++
 .../tuweni/rlpx/wire/DefaultWireConnection.java    |  10 ++
 .../org/apache/tuweni/rlpx/wire/SubProtocol.java   |   8 ++
 ...{WireConnection.java => SubProtocolClient.java} |  11 +-
 .../apache/tuweni/rlpx/wire/WireConnection.java    |   8 ++
 18 files changed, 234 insertions(+), 49 deletions(-)

diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index 4233889..ce883cf 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -19,6 +19,7 @@ package org.apache.tuweni.devp2p.eth
 import io.vertx.core.Vertx
 import kotlinx.coroutines.runBlocking
 import org.apache.lucene.index.IndexWriter
+import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.coroutines.await
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.eth.genesis.GenesisFile
@@ -78,7 +79,7 @@ class EthHandlerTest {
       "Tuweni Experiment 0.1"
     )
     service.start().await()
-    val id = service.connectTo(
+    service.connectTo(
       SECP256K1.PublicKey.fromHexString(
         "b1c9e33ebfd9446151688f0abaf171dac6df31ea5205a200f2cbaf5f8be" +
           "d241c9f93732f25109e16badea1aa657a6078240657688cbbddb91a50aa8c7c34a9cc"
@@ -86,14 +87,9 @@ class EthHandlerTest {
       InetSocketAddress("192.168.88.46", 30303)
     ).await()
 
-    service.send(
-      EthSubprotocol.ETH64,
-      MessageType.GetBlockHeaders.code,
-      id,
-      GetBlockHeaders(genesisBlock.header.hash, 100, 0, false).toBytes()
-    )
+    val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
+    client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
 
-    Thread.sleep(3000)
     val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
     Assertions.assertFalse(header.isEmpty())
 
@@ -101,8 +97,7 @@ class EthHandlerTest {
     Assertions.assertTrue(header3.isEmpty())
     val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
     Assertions.assertTrue(header2.isEmpty())
-    service.send(EthSubprotocol.ETH64, 3, id, GetBlockHeaders(header[0], 100, 0, false).toBytes())
-    Thread.sleep(3000)
+    service.stop().await()
   }
 
   @Test
@@ -172,5 +167,6 @@ class EthHandlerTest {
     val value = service.connectTo(service2kp.publicKey(), InetSocketAddress("127.0.0.1", service2.actualPort()))
       .await()
     Assertions.assertNotNull(value)
+    AsyncCompletion.allOf(service.stop(), service2.stop()).await()
   }
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
new file mode 100644
index 0000000..49fc4fb
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.units.bigints.UInt256
+
+class EthClient(private val service: RLPxService) : EthRequestsManager, SubProtocolClient {
+
+  override fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
+    val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+    val completion = AsyncCompletion.incomplete()
+    headerRequests.computeIfAbsent(blockHash) {
+      service.send(
+        EthSubprotocol.ETH62,
+        MessageType.GetBlockHeaders.code,
+        conn!!.id(),
+        GetBlockHeaders(blockHash, maxHeaders, skip, reverse).toBytes()
+      )
+      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+    }
+    return completion
+  }
+
+  private val headerRequests = HashMap<Bytes32, BlockHeaderRequest>()
+  private val bodiesRequests = HashMap<String, List<Hash>>()
+
+  override fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
+    val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+    val blockNumberBytes = UInt256.valueOf(blockNumber).toBytes()
+    val completion = AsyncCompletion.incomplete()
+    headerRequests.computeIfAbsent(blockNumberBytes) {
+      service.send(
+        EthSubprotocol.ETH62,
+        MessageType.GetBlockHeaders.code,
+        conn!!.id(),
+        GetBlockHeaders(blockNumberBytes, maxHeaders, skip, reverse).toBytes()
+      )
+      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+    }
+    return completion
+  }
+
+  override fun requestBlockHeader(blockHash: Hash): AsyncCompletion {
+    val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+    val completion = AsyncCompletion.incomplete()
+    headerRequests.computeIfAbsent(blockHash) {
+      service.send(
+        EthSubprotocol.ETH62,
+        MessageType.GetBlockHeaders.code,
+        conn!!.id(),
+        GetBlockHeaders(blockHash, 1, 0, false).toBytes()
+      )
+      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+    }
+    return completion
+  }
+
+  override fun requestBlockBodies(blockHashes: List<Hash>) {
+    val conns = service.repository().asIterable(EthSubprotocol.ETH62)
+    conns.forEach { conn ->
+      if (bodiesRequests.computeIfAbsent(conn.id()) {
+        service.send(
+          EthSubprotocol.ETH62,
+          MessageType.GetBlockBodies.code,
+          conn.id(),
+          GetBlockBodies(blockHashes).toBytes()
+        )
+        blockHashes
+      } == blockHashes) {
+        return
+      }
+    }
+  }
+
+  override fun requestBlock(blockHash: Hash) {
+    requestBlockHeader(blockHash)
+    requestBlockBodies(listOf(blockHash))
+  }
+
+  override fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion? {
+    val request = headerRequests.remove(header.hash) ?: return null
+    if (request.connectionId == connectionId) {
+      return request.handle
+    } else {
+      return null
+    }
+  }
+
+  override fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>? =
+    bodiesRequests.get(connectionId)
+}
+
+internal data class BlockHeaderRequest(val connectionId: String, val handle: CompletableAsyncCompletion)
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index 005c33d..e593a3c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -90,16 +90,23 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
   }
 
   private fun requestBlockBody(blockHash: Hash) {
-    requestsManager.requestBlockBody(blockHash)
+    requestsManager.requestBlockBodies(listOf(blockHash))
   }
 
   suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
+    val handle = requestsManager.wasRequested(connectionId, headers.first()) ?: return
     headers.forEach { header ->
-      if (!requestsManager.wasRequested(connectionId, header)) {
-        return
-      }
-
       repository.storeBlockHeader(header)
     }
+    handle.complete()
+  }
+
+  suspend fun addNewBlockBodies(connectionId: String, bodies: List<BlockBody>) {
+    val hashes = requestsManager.wasRequested(connectionId, bodies)
+    if (hashes != null) {
+      for (i in 0..hashes.size) {
+        repository.storeBlockBody(hashes[i], bodies[i])
+      }
+    }
   }
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index c7a5fcd..86ec780 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -50,7 +50,7 @@ internal class EthHandler(
       MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connectionId, GetBlockHeaders.read(message))
       MessageType.BlockHeaders.code -> handleHeaders(connectionId, BlockHeaders.read(message))
       MessageType.GetBlockBodies.code -> handleGetBlockBodies(connectionId, GetBlockBodies.read(message))
-      MessageType.BlockBodies.code -> handleBlockBodies(BlockBodies.read(message))
+      MessageType.BlockBodies.code -> handleBlockBodies(connectionId, BlockBodies.read(message))
       MessageType.NewBlock.code -> handleNewBlock(NewBlock.read(message))
       MessageType.GetNodeData.code -> handleGetNodeData(connectionId, GetNodeData.read(message))
 //    MessageType.NodeData.code-> // not implemented yet.
@@ -92,14 +92,8 @@ internal class EthHandler(
     controller.addNewBlock(read.block)
   }
 
-  private fun handleBlockBodies(message: BlockBodies) {
-    message.bodies.forEach {
-      //      if (blockBodyRequests.remove(it)) {
-//        repository.
-//      } else {
-//        service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH)
-//      }
-    }
+  private suspend fun handleBlockBodies(connectionId: String, message: BlockBodies) {
+    controller.addNewBlockBodies(connectionId, message.bodies)
   }
 
   private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
@@ -144,14 +138,12 @@ internal class EthHandler(
   }
 
   override fun stop() = asyncCompletion {
-    TODO("not implemented") // To change body of created functions use File | Settings | File Templates.
   }
 }
 
 class PeerInfo() {
 
   val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
-  // TODO disconnect if not responding after timeout.
 
   fun connect() {
     ready.complete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index 1234ab9..3dc3181 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -16,17 +16,21 @@
  */
 package org.apache.tuweni.devp2p.eth
 
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 
 interface EthRequestsManager {
-  fun requestBlockHeader(blockHash: Hash)
+  fun requestBlockHeader(blockHash: Hash): AsyncCompletion
+  fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
+  fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
 
-  fun requestBlockBody(blockHash: Hash)
+  fun requestBlockBodies(blockHashes: List<Hash>)
 
   fun requestBlock(blockHash: Hash)
 
-  fun wasRequested(connectionId: String, header: BlockHeader): Boolean
-  fun wasRequested(connectionId: String, header: BlockBody): Boolean
+  fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion?
+  fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>?
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index e30e99c..981f7f3 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
 import org.apache.tuweni.rlpx.wire.SubProtocolHandler
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
 import kotlin.coroutines.CoroutineContext
@@ -27,9 +28,11 @@ import kotlin.coroutines.CoroutineContext
 class EthSubprotocol(
   private val coroutineContext: CoroutineContext = Dispatchers.Default,
   private val blockchainInfo: BlockchainInformation,
-  private val repository: BlockchainRepository,
-  private val requestsManager: EthRequestsManager
+  private val repository: BlockchainRepository
 ) : SubProtocol {
+
+  private var client: EthClient? = null
+
   companion object {
     val ETH62 = SubProtocolIdentifier.of("eth", 62)
     val ETH63 = SubProtocolIdentifier.of("eth", 63)
@@ -51,9 +54,18 @@ class EthSubprotocol(
   }
 
   override fun createHandler(service: RLPxService): SubProtocolHandler {
-    val controller = EthController(repository, requestsManager)
+    val controller = EthController(repository, createClient(service) as EthRequestsManager)
     return EthHandler(coroutineContext, blockchainInfo, service, controller)
   }
 
   override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64)
+
+  override fun createClient(service: RLPxService): SubProtocolClient {
+    if (client == null) {
+      val c = EthClient(service)
+      client = c
+      return c
+    }
+    return client!!
+  }
 }
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index 5cc07ef..2085835 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.Assertions.assertFalse
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
-import org.mockito.Mockito.mock
 
 @ExtendWith(BouncyCastleExtension::class, VertxExtension::class, LuceneIndexWriterExtension::class)
 class EthSubprotocolTest {
@@ -57,8 +56,7 @@ class EthSubprotocolTest {
     )
     val eth = EthSubprotocol(
       blockchainInfo = blockchainInfo,
-      repository = repository,
-      requestsManager = mock(EthRequestsManager::class.java)
+      repository = repository
     )
     assertEquals(SubProtocolIdentifier.of("eth", 64), eth.id())
   }
@@ -74,8 +72,7 @@ class EthSubprotocolTest {
     )
     val eth = EthSubprotocol(
       blockchainInfo = blockchainInfo,
-      repository = repository,
-      requestsManager = mock(EthRequestsManager::class.java)
+      repository = repository
     )
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
@@ -95,8 +92,7 @@ class EthSubprotocolTest {
     )
     val eth = EthSubprotocol(
       blockchainInfo = blockchainInfo,
-      repository = repository,
-      requestsManager = mock(EthRequestsManager::class.java)
+      repository = repository
     )
     assertEquals(8, eth.versionRange(62))
     assertEquals(17, eth.versionRange(63))
diff --git a/eth-repository/build.gradle b/eth-repository/build.gradle
index 64d674e..babb85b 100644
--- a/eth-repository/build.gradle
+++ b/eth-repository/build.gradle
@@ -17,6 +17,7 @@ dependencies {
   compile project(':eth')
   compile project(':kv')
   compile 'org.apache.lucene:lucene-core'
+  compile 'org.slf4j:slf4j-api'
 
 
   testCompile project(':junit')
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 73dbf32..21b5d02 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -23,6 +23,7 @@ import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.kv.KeyValueStore
+import org.slf4j.LoggerFactory
 
 /**
  * Repository housing blockchain information.
@@ -48,6 +49,7 @@ class BlockchainRepository
 
   companion object {
 
+    val logger = LoggerFactory.getLogger(BlockchainRepository::class.java)
     val GENESIS_BLOCK = Bytes.wrap("genesisBlock".toByteArray())
 
     /**
@@ -123,6 +125,7 @@ class BlockchainRepository
   suspend fun storeBlockHeader(header: BlockHeader) {
     blockHeaderStore.put(header.hash, header.toBytes())
     indexBlockHeader(header)
+    logger.debug("Stored header {}", header.hash)
   }
 
   private suspend fun indexBlockHeader(header: BlockHeader) {
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
index 726333f..02a4a99 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
@@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
 import org.apache.tuweni.rlpx.wire.SubProtocolHandler
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
 import org.apache.tuweni.units.bigints.UInt256
@@ -51,6 +52,10 @@ class LESSubprotocol
    private val repo: BlockchainRepository
  ) : SubProtocol {
 
+  override fun createClient(service: RLPxService): SubProtocolClient {
+    TODO("not implemented")
+  }
+
   override fun getCapabilities(): MutableList<SubProtocolIdentifier> = mutableListOf(SubProtocolIdentifier.of("les", 2))
 
   override fun id(): SubProtocolIdentifier {
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
index d80cd52..a54964c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
@@ -12,10 +12,12 @@
  */
 package org.apache.tuweni.rlpx;
 
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * In-memory implementation of the wire connections repository.
@@ -41,6 +43,11 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
   }
 
   @Override
+  public Iterable<WireConnection> asIterable(SubProtocolIdentifier identifier) {
+    return connections.values().stream().filter(conn -> conn.supports(identifier)).collect(Collectors.toList());
+  }
+
+  @Override
   public void close() {
     connections.clear();
   }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index f5e3b7c..f15380e 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -17,6 +17,7 @@ import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.rlpx.wire.DisconnectReason;
+import org.apache.tuweni.rlpx.wire.SubProtocolClient;
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 
 import java.net.InetSocketAddress;
@@ -83,4 +84,12 @@ public interface RLPxService {
    * @return the repository of wire connections associated with this service.
    */
   WireConnectionRepository repository();
+
+  /**
+   * Gets a subprotocol client associated with the given subprotocol.
+   * 
+   * @param subProtocolIdentifier the subprotocol identifier
+   * @return the client of the subprotocol.
+   */
+  SubProtocolClient getClient(SubProtocolIdentifier subProtocolIdentifier);
 }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
index beb9240..0264a5a 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
@@ -12,6 +12,7 @@
  */
 package org.apache.tuweni.rlpx;
 
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
 /**
@@ -44,6 +45,13 @@ public interface WireConnectionRepository {
   Iterable<WireConnection> asIterable();
 
   /**
+   * Provides a subset of wire connections with a particular capabilities.
+   *
+   * @return an Iterable object allowing to traverse all wire connections held by this repository
+   */
+  Iterable<WireConnection> asIterable(SubProtocolIdentifier identifier);
+
+  /**
    * Closes the repository. After it has been closed, the repository should no longer be able to add or retrieve
    * connections.
    *
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index e96e7c5..3dd3fa3 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -29,6 +29,7 @@ import org.apache.tuweni.rlpx.WireConnectionRepository;
 import org.apache.tuweni.rlpx.wire.DefaultWireConnection;
 import org.apache.tuweni.rlpx.wire.DisconnectReason;
 import org.apache.tuweni.rlpx.wire.SubProtocol;
+import org.apache.tuweni.rlpx.wire.SubProtocolClient;
 import org.apache.tuweni.rlpx.wire.SubProtocolHandler;
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
@@ -284,6 +285,16 @@ public final class VertxRLPxService implements RLPxService {
   }
 
   @Override
+  public SubProtocolClient getClient(SubProtocolIdentifier subProtocolIdentifier) {
+    for (SubProtocol subProtocol : subProtocols) {
+      if (subProtocol.supports(subProtocolIdentifier)) {
+        return subProtocol.createClient(this);
+      }
+    }
+    return null;
+  }
+
+  @Override
   public AsyncResult<String> connectTo(PublicKey peerPublicKey, InetSocketAddress peerAddress) {
     if (!started.get()) {
       throw new IllegalStateException("The RLPx service is not active");
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index 3a246c4..7d60eb1 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -260,6 +260,16 @@ public final class DefaultWireConnection implements WireConnection {
     return id;
   }
 
+  @Override
+  public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
+    for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
+      if (sp.supports(subProtocolIdentifier)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @SuppressWarnings("CatchAndPrintStackTrace")
   public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) {
     logger.debug("Sending sub-protocol message {} {}", messageType, message);
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
index fd951de..84741e2 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
@@ -50,6 +50,14 @@ public interface SubProtocol {
   SubProtocolHandler createHandler(RLPxService service);
 
   /**
+   * Creates a new client for the subprotocol.
+   *
+   * @param service the rlpx service that will use the handler
+   * @return a new client for the subprotocol, bound to the service.
+   */
+  SubProtocolClient createClient(RLPxService service);
+
+  /**
    * Provides the capabilities supported by the subprotocol.
    * 
    * @return the capabilities for this protocol, ordered.
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
similarity index 80%
copy from rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
copy to rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
index 3efeda7..03aad3c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
@@ -12,15 +12,8 @@
  */
 package org.apache.tuweni.rlpx.wire;
 
-
 /**
- * A stateful connection between two peers under the Devp2p wire protocol.
+ * Subprotocol client allowing outside users to call functions associated with the subprotocol.
  */
-public interface WireConnection {
-
-  /**
-   *
-   * @return the identifier of this wire connection
-   */
-  public String id();
+public interface SubProtocolClient {
 }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
index 3efeda7..6d97194 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
@@ -23,4 +23,12 @@ public interface WireConnection {
    * @return the identifier of this wire connection
    */
   public String id();
+
+  /**
+   * Returns true if the connection supports the subprotocol
+   * 
+   * @param subProtocolIdentifier the subprotocol identifier
+   * @return true if the subprotocol is supported
+   */
+  public boolean supports(SubProtocolIdentifier subProtocolIdentifier);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org