You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/03 08:46:41 UTC
[incubator-tuweni] branch master updated: Add a controller and a
requests manager
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 3c856d5 Add a controller and a requests manager
3c856d5 is described below
commit 3c856d57b7fb1751500ad6ac60222f1081b96453
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Jun 3 01:45:05 2020 -0700
Add a controller and a requests manager
---
.../org/apache/tuweni/devp2p/eth/EthController.kt | 105 +++++++++++++++++++++
.../org/apache/tuweni/devp2p/eth/EthHandler.kt | 94 +++++-------------
.../apache/tuweni/devp2p/eth/EthRequestsManager.kt | 32 +++++++
.../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt | 9 +-
.../org/apache/tuweni/devp2p/eth/EthHandlerTest.kt | 3 +-
.../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt | 19 +++-
.../tuweni/eth/repository/BlockchainRepository.kt | 21 +----
7 files changed, 187 insertions(+), 96 deletions(-)
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
new file mode 100644
index 0000000..005c33d
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.eth.Block
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
+import org.apache.tuweni.eth.repository.BlockchainRepository
+
+class EthController(val repository: BlockchainRepository, val requestsManager: EthRequestsManager) {
+
+ suspend fun findTransactionReceipts(hashes: List<Hash>): List<List<TransactionReceipt>> {
+ val receipts = ArrayList<List<TransactionReceipt>>()
+ hashes.forEach {
+ receipts.add(repository.retrieveTransactionReceipts(it))
+ }
+ return receipts
+ }
+
+ suspend fun addNewBlock(newBlock: Block) {
+ repository.storeBlock(newBlock)
+ }
+
+ suspend fun findBlockBodies(hashes: List<Hash>): List<BlockBody> {
+ val bodies = ArrayList<BlockBody>()
+ hashes.forEach { hash ->
+ repository.retrieveBlockBody(hash)?.let {
+ bodies.add(it)
+ }
+ }
+ return bodies
+ }
+
+ suspend fun findHeaders(block: Bytes, maxHeaders: Long, skip: Long, reverse: Boolean): List<BlockHeader> {
+ val matches = repository.findBlockByHashOrNumber(block)
+ val headers = ArrayList<BlockHeader>()
+ if (matches.isNotEmpty()) {
+ val header = repository.retrieveBlockHeader(matches[0])
+ header?.let {
+ headers.add(it)
+ var blockNumber = it.number
+ for (i in 2..maxHeaders) {
+ blockNumber = if (reverse) {
+ blockNumber.subtract(skip + 1)
+ } else {
+ blockNumber.add(skip + 1)
+ }
+ val nextMatches = repository.findBlockByHashOrNumber(blockNumber.toBytes())
+ if (nextMatches.isEmpty()) {
+ break
+ }
+ val nextHeader = repository.retrieveBlockHeader(nextMatches[0]) ?: break
+ headers.add(nextHeader)
+ }
+ }
+ }
+ return headers
+ }
+
+ suspend fun addNewBlockHashes(hashes: List<Pair<Hash, Long>>) {
+ hashes.forEach { pair ->
+ repository.retrieveBlockHeader(pair.first).takeIf { null == it }.apply {
+ requestBlockHeader(pair.first)
+ }
+ repository.retrieveBlockBody(pair.first).takeIf { null == it }.apply {
+ requestBlockBody(pair.first)
+ }
+ }
+ }
+
+ fun requestBlockHeader(blockHash: Hash) {
+ requestsManager.requestBlockHeader(blockHash)
+ }
+
+ private fun requestBlockBody(blockHash: Hash) {
+ requestsManager.requestBlockBody(blockHash)
+ }
+
+ suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
+ headers.forEach { header ->
+ if (!requestsManager.wasRequested(connectionId, header)) {
+ return
+ }
+
+ repository.storeBlockHeader(header)
+ }
+ }
+}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index ae16b05..c7a5fcd 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -22,11 +22,6 @@ import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
-import org.apache.tuweni.eth.BlockBody
-import org.apache.tuweni.eth.BlockHeader
-import org.apache.tuweni.eth.Hash
-import org.apache.tuweni.eth.TransactionReceipt
-import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.DisconnectReason
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
@@ -37,7 +32,7 @@ internal class EthHandler(
override val coroutineContext: CoroutineContext = Dispatchers.Default,
private val blockchainInfo: BlockchainInformation,
private val service: RLPxService,
- private val repository: BlockchainRepository
+ private val controller: EthController
) : SubProtocolHandler, CoroutineScope {
companion object {
@@ -45,8 +40,6 @@ internal class EthHandler(
}
val peersMap: MutableMap<String, PeerInfo> = mutableMapOf()
- val blockHeaderRequests = ArrayList<Hash>()
- val blockBodyRequests = ArrayList<Hash>()
override fun handle(connectionId: String, messageType: Int, message: Bytes) = asyncCompletion {
logger.debug("Receiving message of type {}", messageType)
@@ -72,6 +65,7 @@ internal class EthHandler(
service.disconnect(connectionId, DisconnectReason.SUBPROTOCOL_REASON)
}
peersMap[connectionId]?.connect()
+ // TODO send to controller.
}
// private fun handleReceipts(receipts: Receipts) {
@@ -79,11 +73,13 @@ internal class EthHandler(
// }
private suspend fun handleGetReceipts(connectionId: String, getReceipts: GetReceipts) {
- val receipts = ArrayList<List<TransactionReceipt>>()
- getReceipts.hashes.forEach {
- receipts.add(repository.retrieveTransactionReceipts(it))
- }
- service.send(EthSubprotocol.ETH64, MessageType.Receipts.code, connectionId, Receipts(receipts).toBytes())
+
+ service.send(
+ EthSubprotocol.ETH64,
+ MessageType.Receipts.code,
+ connectionId,
+ Receipts(controller.findTransactionReceipts(getReceipts.hashes)).toBytes()
+ )
}
private fun handleGetNodeData(connectionId: String, nodeData: GetNodeData) {
@@ -93,8 +89,7 @@ internal class EthHandler(
}
private suspend fun handleNewBlock(read: NewBlock) {
- repository.storeBlock(read.block)
- // TODO more to do there
+ controller.addNewBlock(read.block)
}
private fun handleBlockBodies(message: BlockBodies) {
@@ -108,70 +103,30 @@ internal class EthHandler(
}
private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
- val bodies = ArrayList<BlockBody>()
- message.hashes.forEach { hash ->
- repository.retrieveBlockBody(hash)?.let {
- bodies.add(it)
- }
- }
- service.send(EthSubprotocol.ETH64, 6, connectionId, BlockBodies(bodies).toBytes())
+ service.send(
+ EthSubprotocol.ETH64,
+ 6,
+ connectionId,
+ BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
+ )
}
private suspend fun handleHeaders(connectionId: String, headers: BlockHeaders) {
- connectionId.toString()
- headers.headers.forEach {
- repository.storeBlockHeader(it)
-// if (blockHeaderRequests.remove(it.hash)) {
-//
-// } else {
-// service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH)
-// }
- }
+ controller.addNewBlockHeaders(connectionId, headers.headers)
}
private suspend fun handleGetBlockHeaders(connectionId: String, blockHeaderRequest: GetBlockHeaders) {
- val matches = repository.findBlockByHashOrNumber(blockHeaderRequest.block)
- val headers = ArrayList<BlockHeader>()
- if (matches.isNotEmpty()) {
- val header = repository.retrieveBlockHeader(matches[0])
- header?.let {
- headers.add(it)
- var blockNumber = it.number
- for (i in 2..blockHeaderRequest.maxHeaders) {
- blockNumber = if (blockHeaderRequest.reverse) {
- blockNumber.subtract(blockHeaderRequest.skip + 1)
- } else {
- blockNumber.add(blockHeaderRequest.skip + 1)
- }
- val nextMatches = repository.findBlockByHashOrNumber(blockNumber.toBytes())
- if (nextMatches.isEmpty()) {
- break
- }
- val nextHeader = repository.retrieveBlockHeader(nextMatches[0]) ?: break
- headers.add(nextHeader)
- }
- }
- }
+ val headers = controller.findHeaders(
+ blockHeaderRequest.block,
+ blockHeaderRequest.maxHeaders,
+ blockHeaderRequest.skip,
+ blockHeaderRequest.reverse
+ )
service.send(EthSubprotocol.ETH64, MessageType.BlockHeaders.code, connectionId, BlockHeaders(headers).toBytes())
}
private suspend fun handleNewBlockHashes(message: NewBlockHashes) {
- message.hashes.forEach { pair ->
- repository.retrieveBlockHeader(pair.first).takeIf { null == it }.apply {
- requestBlockHeader(pair.first)
- }
- repository.retrieveBlockBody(pair.first).takeIf { null == it }.apply {
- requestBlockBody(pair.first)
- }
- }
- }
-
- private fun requestBlockHeader(blockHash: Hash) {
- blockHeaderRequests.add(blockHash)
- }
-
- private fun requestBlockBody(blockHash: Hash) {
- blockBodyRequests.add(blockHash)
+ controller.addNewBlockHashes(message.hashes)
}
override fun handleNewPeerConnection(connectionId: String): AsyncCompletion {
@@ -196,6 +151,7 @@ internal class EthHandler(
class PeerInfo() {
val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
+ // TODO disconnect if not responding after timeout.
fun connect() {
ready.complete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
new file mode 100644
index 0000000..1234ab9
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+
+interface EthRequestsManager {
+ fun requestBlockHeader(blockHash: Hash)
+
+ fun requestBlockBody(blockHash: Hash)
+
+ fun requestBlock(blockHash: Hash)
+
+ fun wasRequested(connectionId: String, header: BlockHeader): Boolean
+ fun wasRequested(connectionId: String, header: BlockBody): Boolean
+}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index 8c4efb0..e30e99c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -27,7 +27,8 @@ import kotlin.coroutines.CoroutineContext
class EthSubprotocol(
private val coroutineContext: CoroutineContext = Dispatchers.Default,
private val blockchainInfo: BlockchainInformation,
- private val repository: BlockchainRepository
+ private val repository: BlockchainRepository,
+ private val requestsManager: EthRequestsManager
) : SubProtocol {
companion object {
val ETH62 = SubProtocolIdentifier.of("eth", 62)
@@ -49,8 +50,10 @@ class EthSubprotocol(
}
}
- override fun createHandler(service: RLPxService): SubProtocolHandler =
- EthHandler(coroutineContext, blockchainInfo, service, repository)
+ override fun createHandler(service: RLPxService): SubProtocolHandler {
+ val controller = EthController(repository, requestsManager)
+ return EthHandler(coroutineContext, blockchainInfo, service, controller)
+ }
override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64)
}
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
index 870515a..714ea99 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
@@ -109,6 +109,7 @@ class EthHandlerTest {
genesisBlock
)
service = mock(RLPxService::class.java)
+ val requestsManager = mock(EthRequestsManager::class.java)
handler = EthHandler(
blockchainInfo = SimpleBlockchainInformation(
UInt256.valueOf(42L),
@@ -118,7 +119,7 @@ class EthHandlerTest {
emptyList()
),
service = service,
- repository = repository
+ controller = EthController(repository, requestsManager)
)
for (i in 1..10) {
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index 5eb8500..5cc07ef 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.Mockito.mock
@ExtendWith(BouncyCastleExtension::class, VertxExtension::class, LuceneIndexWriterExtension::class)
class EthSubprotocolTest {
@@ -54,7 +55,11 @@ class EthSubprotocolTest {
MapKeyValueStore(),
BlockchainIndex(writer)
)
- val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+ val eth = EthSubprotocol(
+ blockchainInfo = blockchainInfo,
+ repository = repository,
+ requestsManager = mock(EthRequestsManager::class.java)
+ )
assertEquals(SubProtocolIdentifier.of("eth", 64), eth.id())
}
@@ -67,7 +72,11 @@ class EthSubprotocolTest {
MapKeyValueStore(),
BlockchainIndex(writer)
)
- val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+ val eth = EthSubprotocol(
+ blockchainInfo = blockchainInfo,
+ repository = repository,
+ requestsManager = mock(EthRequestsManager::class.java)
+ )
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 62)))
@@ -84,7 +93,11 @@ class EthSubprotocolTest {
MapKeyValueStore(),
BlockchainIndex(writer)
)
- val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+ val eth = EthSubprotocol(
+ blockchainInfo = blockchainInfo,
+ repository = repository,
+ requestsManager = mock(EthRequestsManager::class.java)
+ )
assertEquals(8, eth.versionRange(62))
assertEquals(17, eth.versionRange(63))
assertEquals(17, eth.versionRange(64))
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 6cb0887..73dbf32 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -97,25 +97,6 @@ class BlockchainRepository
}
/**
- * Store all the transaction receipts of a block in the repository.
- *
- * Transaction receipts should be ordered by the transactions order of the block.
- *
- * @param transactionReceipts the transaction receipts to store
- * @param txHash the hash of the transaction
- * @param blockHash the hash of the block that this transaction belongs to
- */
- suspend fun storeTransactionReceipts(
- vararg transactionReceipts: TransactionReceipt,
- txHash: Bytes,
- blockHash: Bytes
- ) {
- for (i in 0 until transactionReceipts.size) {
- storeTransactionReceipt(transactionReceipts[i], i, txHash, blockHash)
- }
- }
-
- /**
* Stores a transaction receipt in the repository.
*
* @param transactionReceipt the transaction receipt to store
@@ -140,7 +121,7 @@ class BlockchainRepository
* @return handle to the storage operation completion
*/
suspend fun storeBlockHeader(header: BlockHeader) {
- blockHeaderStore.put(header.getHash(), header.toBytes())
+ blockHeaderStore.put(header.hash, header.toBytes())
indexBlockHeader(header)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org