You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/04 07:38:56 UTC
[incubator-tuweni] branch master updated: Expose client
functionality through EthClient
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 7a30392 Expose client functionality through EthClient
7a30392 is described below
commit 7a303923d4e3ea215dbe2add830ef4dc31460165
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Jun 4 00:38:43 2020 -0700
Expose client functionality through EthClient
---
.../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt | 16 ++-
.../org/apache/tuweni/devp2p/eth/EthClient.kt | 115 +++++++++++++++++++++
.../org/apache/tuweni/devp2p/eth/EthController.kt | 17 ++-
.../org/apache/tuweni/devp2p/eth/EthHandler.kt | 14 +--
.../apache/tuweni/devp2p/eth/EthRequestsManager.kt | 12 ++-
.../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt | 18 +++-
.../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt | 10 +-
eth-repository/build.gradle | 1 +
.../tuweni/eth/repository/BlockchainRepository.kt | 3 +
.../kotlin/org/apache/tuweni/les/LESSubprotocol.kt | 5 +
.../rlpx/MemoryWireConnectionsRepository.java | 7 ++
.../java/org/apache/tuweni/rlpx/RLPxService.java | 9 ++
.../tuweni/rlpx/WireConnectionRepository.java | 8 ++
.../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 11 ++
.../tuweni/rlpx/wire/DefaultWireConnection.java | 10 ++
.../org/apache/tuweni/rlpx/wire/SubProtocol.java | 8 ++
...{WireConnection.java => SubProtocolClient.java} | 11 +-
.../apache/tuweni/rlpx/wire/WireConnection.java | 8 ++
18 files changed, 234 insertions(+), 49 deletions(-)
diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index 4233889..ce883cf 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -19,6 +19,7 @@ package org.apache.tuweni.devp2p.eth
import io.vertx.core.Vertx
import kotlinx.coroutines.runBlocking
import org.apache.lucene.index.IndexWriter
+import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.coroutines.await
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.eth.genesis.GenesisFile
@@ -78,7 +79,7 @@ class EthHandlerTest {
"Tuweni Experiment 0.1"
)
service.start().await()
- val id = service.connectTo(
+ service.connectTo(
SECP256K1.PublicKey.fromHexString(
"b1c9e33ebfd9446151688f0abaf171dac6df31ea5205a200f2cbaf5f8be" +
"d241c9f93732f25109e16badea1aa657a6078240657688cbbddb91a50aa8c7c34a9cc"
@@ -86,14 +87,9 @@ class EthHandlerTest {
InetSocketAddress("192.168.88.46", 30303)
).await()
- service.send(
- EthSubprotocol.ETH64,
- MessageType.GetBlockHeaders.code,
- id,
- GetBlockHeaders(genesisBlock.header.hash, 100, 0, false).toBytes()
- )
+ val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
+ client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
- Thread.sleep(3000)
val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
Assertions.assertFalse(header.isEmpty())
@@ -101,8 +97,7 @@ class EthHandlerTest {
Assertions.assertTrue(header3.isEmpty())
val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
Assertions.assertTrue(header2.isEmpty())
- service.send(EthSubprotocol.ETH64, 3, id, GetBlockHeaders(header[0], 100, 0, false).toBytes())
- Thread.sleep(3000)
+ service.stop().await()
}
@Test
@@ -172,5 +167,6 @@ class EthHandlerTest {
val value = service.connectTo(service2kp.publicKey(), InetSocketAddress("127.0.0.1", service2.actualPort()))
.await()
Assertions.assertNotNull(value)
+ AsyncCompletion.allOf(service.stop(), service2.stop()).await()
}
}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
new file mode 100644
index 0000000..49fc4fb
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.units.bigints.UInt256
+
+class EthClient(private val service: RLPxService) : EthRequestsManager, SubProtocolClient {
+
+ override fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
+ val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+ val completion = AsyncCompletion.incomplete()
+ headerRequests.computeIfAbsent(blockHash) {
+ service.send(
+ EthSubprotocol.ETH62,
+ MessageType.GetBlockHeaders.code,
+ conn!!.id(),
+ GetBlockHeaders(blockHash, maxHeaders, skip, reverse).toBytes()
+ )
+ BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+ }
+ return completion
+ }
+
+ private val headerRequests = HashMap<Bytes32, BlockHeaderRequest>()
+ private val bodiesRequests = HashMap<String, List<Hash>>()
+
+ override fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
+ val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+ val blockNumberBytes = UInt256.valueOf(blockNumber).toBytes()
+ val completion = AsyncCompletion.incomplete()
+ headerRequests.computeIfAbsent(blockNumberBytes) {
+ service.send(
+ EthSubprotocol.ETH62,
+ MessageType.GetBlockHeaders.code,
+ conn!!.id(),
+ GetBlockHeaders(blockNumberBytes, maxHeaders, skip, reverse).toBytes()
+ )
+ BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+ }
+ return completion
+ }
+
+ override fun requestBlockHeader(blockHash: Hash): AsyncCompletion {
+ val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
+ val completion = AsyncCompletion.incomplete()
+ headerRequests.computeIfAbsent(blockHash) {
+ service.send(
+ EthSubprotocol.ETH62,
+ MessageType.GetBlockHeaders.code,
+ conn!!.id(),
+ GetBlockHeaders(blockHash, 1, 0, false).toBytes()
+ )
+ BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+ }
+ return completion
+ }
+
+ override fun requestBlockBodies(blockHashes: List<Hash>) {
+ val conns = service.repository().asIterable(EthSubprotocol.ETH62)
+ conns.forEach { conn ->
+ if (bodiesRequests.computeIfAbsent(conn.id()) {
+ service.send(
+ EthSubprotocol.ETH62,
+ MessageType.GetBlockBodies.code,
+ conn.id(),
+ GetBlockBodies(blockHashes).toBytes()
+ )
+ blockHashes
+ } == blockHashes) {
+ return
+ }
+ }
+ }
+
+ override fun requestBlock(blockHash: Hash) {
+ requestBlockHeader(blockHash)
+ requestBlockBodies(listOf(blockHash))
+ }
+
+ override fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion? {
+ val request = headerRequests.remove(header.hash) ?: return null
+ if (request.connectionId == connectionId) {
+ return request.handle
+ } else {
+ return null
+ }
+ }
+
+ override fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>? =
+ bodiesRequests.get(connectionId)
+}
+
+internal data class BlockHeaderRequest(val connectionId: String, val handle: CompletableAsyncCompletion)
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index 005c33d..e593a3c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -90,16 +90,23 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
}
private fun requestBlockBody(blockHash: Hash) {
- requestsManager.requestBlockBody(blockHash)
+ requestsManager.requestBlockBodies(listOf(blockHash))
}
suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
+ val handle = requestsManager.wasRequested(connectionId, headers.first()) ?: return
headers.forEach { header ->
- if (!requestsManager.wasRequested(connectionId, header)) {
- return
- }
-
repository.storeBlockHeader(header)
}
+ handle.complete()
+ }
+
+ suspend fun addNewBlockBodies(connectionId: String, bodies: List<BlockBody>) {
+ val hashes = requestsManager.wasRequested(connectionId, bodies)
+ if (hashes != null) {
+ for (i in 0..hashes.size) {
+ repository.storeBlockBody(hashes[i], bodies[i])
+ }
+ }
}
}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index c7a5fcd..86ec780 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -50,7 +50,7 @@ internal class EthHandler(
MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connectionId, GetBlockHeaders.read(message))
MessageType.BlockHeaders.code -> handleHeaders(connectionId, BlockHeaders.read(message))
MessageType.GetBlockBodies.code -> handleGetBlockBodies(connectionId, GetBlockBodies.read(message))
- MessageType.BlockBodies.code -> handleBlockBodies(BlockBodies.read(message))
+ MessageType.BlockBodies.code -> handleBlockBodies(connectionId, BlockBodies.read(message))
MessageType.NewBlock.code -> handleNewBlock(NewBlock.read(message))
MessageType.GetNodeData.code -> handleGetNodeData(connectionId, GetNodeData.read(message))
// MessageType.NodeData.code-> // not implemented yet.
@@ -92,14 +92,8 @@ internal class EthHandler(
controller.addNewBlock(read.block)
}
- private fun handleBlockBodies(message: BlockBodies) {
- message.bodies.forEach {
- // if (blockBodyRequests.remove(it)) {
-// repository.
-// } else {
-// service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH)
-// }
- }
+ private suspend fun handleBlockBodies(connectionId: String, message: BlockBodies) {
+ controller.addNewBlockBodies(connectionId, message.bodies)
}
private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
@@ -144,14 +138,12 @@ internal class EthHandler(
}
override fun stop() = asyncCompletion {
- TODO("not implemented") // To change body of created functions use File | Settings | File Templates.
}
}
class PeerInfo() {
val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
- // TODO disconnect if not responding after timeout.
fun connect() {
ready.complete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index 1234ab9..3dc3181 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -16,17 +16,21 @@
*/
package org.apache.tuweni.devp2p.eth
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.eth.BlockBody
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
interface EthRequestsManager {
- fun requestBlockHeader(blockHash: Hash)
+ fun requestBlockHeader(blockHash: Hash): AsyncCompletion
+ fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
+ fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
- fun requestBlockBody(blockHash: Hash)
+ fun requestBlockBodies(blockHashes: List<Hash>)
fun requestBlock(blockHash: Hash)
- fun wasRequested(connectionId: String, header: BlockHeader): Boolean
- fun wasRequested(connectionId: String, header: BlockBody): Boolean
+ fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion?
+ fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>?
}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index e30e99c..981f7f3 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
import kotlin.coroutines.CoroutineContext
@@ -27,9 +28,11 @@ import kotlin.coroutines.CoroutineContext
class EthSubprotocol(
private val coroutineContext: CoroutineContext = Dispatchers.Default,
private val blockchainInfo: BlockchainInformation,
- private val repository: BlockchainRepository,
- private val requestsManager: EthRequestsManager
+ private val repository: BlockchainRepository
) : SubProtocol {
+
+ private var client: EthClient? = null
+
companion object {
val ETH62 = SubProtocolIdentifier.of("eth", 62)
val ETH63 = SubProtocolIdentifier.of("eth", 63)
@@ -51,9 +54,18 @@ class EthSubprotocol(
}
override fun createHandler(service: RLPxService): SubProtocolHandler {
- val controller = EthController(repository, requestsManager)
+ val controller = EthController(repository, createClient(service) as EthRequestsManager)
return EthHandler(coroutineContext, blockchainInfo, service, controller)
}
override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64)
+
+ override fun createClient(service: RLPxService): SubProtocolClient {
+ if (client == null) {
+ val c = EthClient(service)
+ client = c
+ return c
+ }
+ return client!!
+ }
}
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index 5cc07ef..2085835 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
-import org.mockito.Mockito.mock
@ExtendWith(BouncyCastleExtension::class, VertxExtension::class, LuceneIndexWriterExtension::class)
class EthSubprotocolTest {
@@ -57,8 +56,7 @@ class EthSubprotocolTest {
)
val eth = EthSubprotocol(
blockchainInfo = blockchainInfo,
- repository = repository,
- requestsManager = mock(EthRequestsManager::class.java)
+ repository = repository
)
assertEquals(SubProtocolIdentifier.of("eth", 64), eth.id())
}
@@ -74,8 +72,7 @@ class EthSubprotocolTest {
)
val eth = EthSubprotocol(
blockchainInfo = blockchainInfo,
- repository = repository,
- requestsManager = mock(EthRequestsManager::class.java)
+ repository = repository
)
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
@@ -95,8 +92,7 @@ class EthSubprotocolTest {
)
val eth = EthSubprotocol(
blockchainInfo = blockchainInfo,
- repository = repository,
- requestsManager = mock(EthRequestsManager::class.java)
+ repository = repository
)
assertEquals(8, eth.versionRange(62))
assertEquals(17, eth.versionRange(63))
diff --git a/eth-repository/build.gradle b/eth-repository/build.gradle
index 64d674e..babb85b 100644
--- a/eth-repository/build.gradle
+++ b/eth-repository/build.gradle
@@ -17,6 +17,7 @@ dependencies {
compile project(':eth')
compile project(':kv')
compile 'org.apache.lucene:lucene-core'
+ compile 'org.slf4j:slf4j-api'
testCompile project(':junit')
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 73dbf32..21b5d02 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -23,6 +23,7 @@ import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.kv.KeyValueStore
+import org.slf4j.LoggerFactory
/**
* Repository housing blockchain information.
@@ -48,6 +49,7 @@ class BlockchainRepository
companion object {
+ val logger = LoggerFactory.getLogger(BlockchainRepository::class.java)
val GENESIS_BLOCK = Bytes.wrap("genesisBlock".toByteArray())
/**
@@ -123,6 +125,7 @@ class BlockchainRepository
suspend fun storeBlockHeader(header: BlockHeader) {
blockHeaderStore.put(header.hash, header.toBytes())
indexBlockHeader(header)
+ logger.debug("Stored header {}", header.hash)
}
private suspend fun indexBlockHeader(header: BlockHeader) {
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
index 726333f..02a4a99 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
@@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
import org.apache.tuweni.units.bigints.UInt256
@@ -51,6 +52,10 @@ class LESSubprotocol
private val repo: BlockchainRepository
) : SubProtocol {
+ override fun createClient(service: RLPxService): SubProtocolClient {
+ TODO("not implemented")
+ }
+
override fun getCapabilities(): MutableList<SubProtocolIdentifier> = mutableListOf(SubProtocolIdentifier.of("les", 2))
override fun id(): SubProtocolIdentifier {
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
index d80cd52..a54964c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/MemoryWireConnectionsRepository.java
@@ -12,10 +12,12 @@
*/
package org.apache.tuweni.rlpx;
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/**
* In-memory implementation of the wire connections repository.
@@ -41,6 +43,11 @@ public class MemoryWireConnectionsRepository implements WireConnectionRepository
}
@Override
+ public Iterable<WireConnection> asIterable(SubProtocolIdentifier identifier) {
+ return connections.values().stream().filter(conn -> conn.supports(identifier)).collect(Collectors.toList());
+ }
+
+ @Override
public void close() {
connections.clear();
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index f5e3b7c..f15380e 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -17,6 +17,7 @@ import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.rlpx.wire.DisconnectReason;
+import org.apache.tuweni.rlpx.wire.SubProtocolClient;
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import java.net.InetSocketAddress;
@@ -83,4 +84,12 @@ public interface RLPxService {
* @return the repository of wire connections associated with this service.
*/
WireConnectionRepository repository();
+
+ /**
+ * Gets a subprotocol client associated with the given subprotocol.
+ *
+ * @param subProtocolIdentifier the subprotocol identifier
+ * @return the client of the subprotocol.
+ */
+ SubProtocolClient getClient(SubProtocolIdentifier subProtocolIdentifier);
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
index beb9240..0264a5a 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/WireConnectionRepository.java
@@ -12,6 +12,7 @@
*/
package org.apache.tuweni.rlpx;
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
/**
@@ -44,6 +45,13 @@ public interface WireConnectionRepository {
Iterable<WireConnection> asIterable();
/**
+ * Provides a subset of wire connections with a particular capabilities.
+ *
+ * @return an Iterable object allowing to traverse all wire connections held by this repository
+ */
+ Iterable<WireConnection> asIterable(SubProtocolIdentifier identifier);
+
+ /**
* Closes the repository. After it has been closed, the repository should no longer be able to add or retrieve
* connections.
*
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index e96e7c5..3dd3fa3 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -29,6 +29,7 @@ import org.apache.tuweni.rlpx.WireConnectionRepository;
import org.apache.tuweni.rlpx.wire.DefaultWireConnection;
import org.apache.tuweni.rlpx.wire.DisconnectReason;
import org.apache.tuweni.rlpx.wire.SubProtocol;
+import org.apache.tuweni.rlpx.wire.SubProtocolClient;
import org.apache.tuweni.rlpx.wire.SubProtocolHandler;
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
@@ -284,6 +285,16 @@ public final class VertxRLPxService implements RLPxService {
}
@Override
+ public SubProtocolClient getClient(SubProtocolIdentifier subProtocolIdentifier) {
+ for (SubProtocol subProtocol : subProtocols) {
+ if (subProtocol.supports(subProtocolIdentifier)) {
+ return subProtocol.createClient(this);
+ }
+ }
+ return null;
+ }
+
+ @Override
public AsyncResult<String> connectTo(PublicKey peerPublicKey, InetSocketAddress peerAddress) {
if (!started.get()) {
throw new IllegalStateException("The RLPx service is not active");
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index 3a246c4..7d60eb1 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -260,6 +260,16 @@ public final class DefaultWireConnection implements WireConnection {
return id;
}
+ @Override
+ public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
+ for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
+ if (sp.supports(subProtocolIdentifier)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@SuppressWarnings("CatchAndPrintStackTrace")
public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) {
logger.debug("Sending sub-protocol message {} {}", messageType, message);
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
index fd951de..84741e2 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
@@ -50,6 +50,14 @@ public interface SubProtocol {
SubProtocolHandler createHandler(RLPxService service);
/**
+ * Creates a new client for the subprotocol.
+ *
+ * @param service the rlpx service that will use the handler
+ * @return a new client for the subprotocol, bound to the service.
+ */
+ SubProtocolClient createClient(RLPxService service);
+
+ /**
* Provides the capabilities supported by the subprotocol.
*
* @return the capabilities for this protocol, ordered.
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
similarity index 80%
copy from rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
copy to rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
index 3efeda7..03aad3c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolClient.java
@@ -12,15 +12,8 @@
*/
package org.apache.tuweni.rlpx.wire;
-
/**
- * A stateful connection between two peers under the Devp2p wire protocol.
+ * Subprotocol client allowing outside users to call functions associated with the subprotocol.
*/
-public interface WireConnection {
-
- /**
- *
- * @return the identifier of this wire connection
- */
- public String id();
+public interface SubProtocolClient {
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
index 3efeda7..6d97194 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/WireConnection.java
@@ -23,4 +23,12 @@ public interface WireConnection {
* @return the identifier of this wire connection
*/
public String id();
+
+ /**
+ * Returns true if the connection supports the subprotocol
+ *
+ * @param subProtocolIdentifier the subprotocol identifier
+ * @return true if the subprotocol is supported
+ */
+ public boolean supports(SubProtocolIdentifier subProtocolIdentifier);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org