You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/03 08:46:41 UTC

[incubator-tuweni] branch master updated: Add a controller and a requests manager

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c856d5  Add a controller and a requests manager
3c856d5 is described below

commit 3c856d57b7fb1751500ad6ac60222f1081b96453
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Jun 3 01:45:05 2020 -0700

    Add a controller and a requests manager
---
 .../org/apache/tuweni/devp2p/eth/EthController.kt  | 105 +++++++++++++++++++++
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     |  94 +++++-------------
 .../apache/tuweni/devp2p/eth/EthRequestsManager.kt |  32 +++++++
 .../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt |   9 +-
 .../org/apache/tuweni/devp2p/eth/EthHandlerTest.kt |   3 +-
 .../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt |  19 +++-
 .../tuweni/eth/repository/BlockchainRepository.kt  |  21 +----
 7 files changed, 187 insertions(+), 96 deletions(-)

diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
new file mode 100644
index 0000000..005c33d
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.eth.Block
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
+import org.apache.tuweni.eth.repository.BlockchainRepository
+
+class EthController(val repository: BlockchainRepository, val requestsManager: EthRequestsManager) {
+
+  suspend fun findTransactionReceipts(hashes: List<Hash>): List<List<TransactionReceipt>> {
+    val receipts = ArrayList<List<TransactionReceipt>>()
+    hashes.forEach {
+      receipts.add(repository.retrieveTransactionReceipts(it))
+    }
+    return receipts
+  }
+
+  suspend fun addNewBlock(newBlock: Block) {
+    repository.storeBlock(newBlock)
+  }
+
+  suspend fun findBlockBodies(hashes: List<Hash>): List<BlockBody> {
+    val bodies = ArrayList<BlockBody>()
+    hashes.forEach { hash ->
+      repository.retrieveBlockBody(hash)?.let {
+        bodies.add(it)
+      }
+    }
+    return bodies
+  }
+
+  suspend fun findHeaders(block: Bytes, maxHeaders: Long, skip: Long, reverse: Boolean): List<BlockHeader> {
+    val matches = repository.findBlockByHashOrNumber(block)
+    val headers = ArrayList<BlockHeader>()
+    if (matches.isNotEmpty()) {
+      val header = repository.retrieveBlockHeader(matches[0])
+      header?.let {
+        headers.add(it)
+        var blockNumber = it.number
+        for (i in 2..maxHeaders) {
+          blockNumber = if (reverse) {
+            blockNumber.subtract(skip + 1)
+          } else {
+            blockNumber.add(skip + 1)
+          }
+          val nextMatches = repository.findBlockByHashOrNumber(blockNumber.toBytes())
+          if (nextMatches.isEmpty()) {
+            break
+          }
+          val nextHeader = repository.retrieveBlockHeader(nextMatches[0]) ?: break
+          headers.add(nextHeader)
+        }
+      }
+    }
+    return headers
+  }
+
+  suspend fun addNewBlockHashes(hashes: List<Pair<Hash, Long>>) {
+    hashes.forEach { pair ->
+      repository.retrieveBlockHeader(pair.first).takeIf { null == it }.apply {
+        requestBlockHeader(pair.first)
+      }
+      repository.retrieveBlockBody(pair.first).takeIf { null == it }.apply {
+        requestBlockBody(pair.first)
+      }
+    }
+  }
+
+  fun requestBlockHeader(blockHash: Hash) {
+    requestsManager.requestBlockHeader(blockHash)
+  }
+
+  private fun requestBlockBody(blockHash: Hash) {
+    requestsManager.requestBlockBody(blockHash)
+  }
+
+  suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
+    headers.forEach { header ->
+      if (!requestsManager.wasRequested(connectionId, header)) {
+        return
+      }
+
+      repository.storeBlockHeader(header)
+    }
+  }
+}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index ae16b05..c7a5fcd 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -22,11 +22,6 @@ import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion
 import org.apache.tuweni.concurrent.coroutines.asyncCompletion
-import org.apache.tuweni.eth.BlockBody
-import org.apache.tuweni.eth.BlockHeader
-import org.apache.tuweni.eth.Hash
-import org.apache.tuweni.eth.TransactionReceipt
-import org.apache.tuweni.eth.repository.BlockchainRepository
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.DisconnectReason
 import org.apache.tuweni.rlpx.wire.SubProtocolHandler
@@ -37,7 +32,7 @@ internal class EthHandler(
   override val coroutineContext: CoroutineContext = Dispatchers.Default,
   private val blockchainInfo: BlockchainInformation,
   private val service: RLPxService,
-  private val repository: BlockchainRepository
+  private val controller: EthController
 ) : SubProtocolHandler, CoroutineScope {
 
   companion object {
@@ -45,8 +40,6 @@ internal class EthHandler(
   }
 
   val peersMap: MutableMap<String, PeerInfo> = mutableMapOf()
-  val blockHeaderRequests = ArrayList<Hash>()
-  val blockBodyRequests = ArrayList<Hash>()
 
   override fun handle(connectionId: String, messageType: Int, message: Bytes) = asyncCompletion {
     logger.debug("Receiving message of type {}", messageType)
@@ -72,6 +65,7 @@ internal class EthHandler(
       service.disconnect(connectionId, DisconnectReason.SUBPROTOCOL_REASON)
     }
     peersMap[connectionId]?.connect()
+    // TODO send to controller.
   }
 
 //  private fun handleReceipts(receipts: Receipts) {
@@ -79,11 +73,13 @@ internal class EthHandler(
 //  }
 
   private suspend fun handleGetReceipts(connectionId: String, getReceipts: GetReceipts) {
-    val receipts = ArrayList<List<TransactionReceipt>>()
-    getReceipts.hashes.forEach {
-      receipts.add(repository.retrieveTransactionReceipts(it))
-    }
-    service.send(EthSubprotocol.ETH64, MessageType.Receipts.code, connectionId, Receipts(receipts).toBytes())
+
+    service.send(
+      EthSubprotocol.ETH64,
+      MessageType.Receipts.code,
+      connectionId,
+      Receipts(controller.findTransactionReceipts(getReceipts.hashes)).toBytes()
+    )
   }
 
   private fun handleGetNodeData(connectionId: String, nodeData: GetNodeData) {
@@ -93,8 +89,7 @@ internal class EthHandler(
   }
 
   private suspend fun handleNewBlock(read: NewBlock) {
-    repository.storeBlock(read.block)
-    // TODO more to do there
+    controller.addNewBlock(read.block)
   }
 
   private fun handleBlockBodies(message: BlockBodies) {
@@ -108,70 +103,30 @@ internal class EthHandler(
   }
 
   private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
-    val bodies = ArrayList<BlockBody>()
-    message.hashes.forEach { hash ->
-      repository.retrieveBlockBody(hash)?.let {
-        bodies.add(it)
-      }
-    }
-    service.send(EthSubprotocol.ETH64, 6, connectionId, BlockBodies(bodies).toBytes())
+    service.send(
+      EthSubprotocol.ETH64,
+      6,
+      connectionId,
+      BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
+    )
   }
 
   private suspend fun handleHeaders(connectionId: String, headers: BlockHeaders) {
-    connectionId.toString()
-    headers.headers.forEach {
-      repository.storeBlockHeader(it)
-//      if (blockHeaderRequests.remove(it.hash)) {
-//
-//      } else {
-//        service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH)
-//      }
-    }
+    controller.addNewBlockHeaders(connectionId, headers.headers)
   }
 
   private suspend fun handleGetBlockHeaders(connectionId: String, blockHeaderRequest: GetBlockHeaders) {
-    val matches = repository.findBlockByHashOrNumber(blockHeaderRequest.block)
-    val headers = ArrayList<BlockHeader>()
-    if (matches.isNotEmpty()) {
-      val header = repository.retrieveBlockHeader(matches[0])
-      header?.let {
-        headers.add(it)
-        var blockNumber = it.number
-        for (i in 2..blockHeaderRequest.maxHeaders) {
-          blockNumber = if (blockHeaderRequest.reverse) {
-            blockNumber.subtract(blockHeaderRequest.skip + 1)
-          } else {
-            blockNumber.add(blockHeaderRequest.skip + 1)
-          }
-          val nextMatches = repository.findBlockByHashOrNumber(blockNumber.toBytes())
-          if (nextMatches.isEmpty()) {
-            break
-          }
-          val nextHeader = repository.retrieveBlockHeader(nextMatches[0]) ?: break
-          headers.add(nextHeader)
-        }
-      }
-    }
+    val headers = controller.findHeaders(
+      blockHeaderRequest.block,
+      blockHeaderRequest.maxHeaders,
+      blockHeaderRequest.skip,
+      blockHeaderRequest.reverse
+    )
     service.send(EthSubprotocol.ETH64, MessageType.BlockHeaders.code, connectionId, BlockHeaders(headers).toBytes())
   }
 
   private suspend fun handleNewBlockHashes(message: NewBlockHashes) {
-    message.hashes.forEach { pair ->
-      repository.retrieveBlockHeader(pair.first).takeIf { null == it }.apply {
-        requestBlockHeader(pair.first)
-      }
-      repository.retrieveBlockBody(pair.first).takeIf { null == it }.apply {
-        requestBlockBody(pair.first)
-      }
-    }
-  }
-
-  private fun requestBlockHeader(blockHash: Hash) {
-    blockHeaderRequests.add(blockHash)
-  }
-
-  private fun requestBlockBody(blockHash: Hash) {
-    blockBodyRequests.add(blockHash)
+    controller.addNewBlockHashes(message.hashes)
   }
 
   override fun handleNewPeerConnection(connectionId: String): AsyncCompletion {
@@ -196,6 +151,7 @@ internal class EthHandler(
 class PeerInfo() {
 
   val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
+  // TODO disconnect if not responding after timeout.
 
   fun connect() {
     ready.complete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
new file mode 100644
index 0000000..1234ab9
--- /dev/null
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.eth
+
+import org.apache.tuweni.eth.BlockBody
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+
+interface EthRequestsManager {
+  fun requestBlockHeader(blockHash: Hash)
+
+  fun requestBlockBody(blockHash: Hash)
+
+  fun requestBlock(blockHash: Hash)
+
+  fun wasRequested(connectionId: String, header: BlockHeader): Boolean
+  fun wasRequested(connectionId: String, header: BlockBody): Boolean
+}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index 8c4efb0..e30e99c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -27,7 +27,8 @@ import kotlin.coroutines.CoroutineContext
 class EthSubprotocol(
   private val coroutineContext: CoroutineContext = Dispatchers.Default,
   private val blockchainInfo: BlockchainInformation,
-  private val repository: BlockchainRepository
+  private val repository: BlockchainRepository,
+  private val requestsManager: EthRequestsManager
 ) : SubProtocol {
   companion object {
     val ETH62 = SubProtocolIdentifier.of("eth", 62)
@@ -49,8 +50,10 @@ class EthSubprotocol(
     }
   }
 
-  override fun createHandler(service: RLPxService): SubProtocolHandler =
-    EthHandler(coroutineContext, blockchainInfo, service, repository)
+  override fun createHandler(service: RLPxService): SubProtocolHandler {
+    val controller = EthController(repository, requestsManager)
+    return EthHandler(coroutineContext, blockchainInfo, service, controller)
+  }
 
   override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64)
 }
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
index 870515a..714ea99 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt
@@ -109,6 +109,7 @@ class EthHandlerTest {
         genesisBlock
       )
       service = mock(RLPxService::class.java)
+      val requestsManager = mock(EthRequestsManager::class.java)
       handler = EthHandler(
         blockchainInfo = SimpleBlockchainInformation(
           UInt256.valueOf(42L),
@@ -118,7 +119,7 @@ class EthHandlerTest {
           emptyList()
         ),
         service = service,
-        repository = repository
+        controller = EthController(repository, requestsManager)
       )
 
       for (i in 1..10) {
diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index 5eb8500..5cc07ef 100644
--- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Assertions.assertFalse
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.Mockito.mock
 
 @ExtendWith(BouncyCastleExtension::class, VertxExtension::class, LuceneIndexWriterExtension::class)
 class EthSubprotocolTest {
@@ -54,7 +55,11 @@ class EthSubprotocolTest {
       MapKeyValueStore(),
       BlockchainIndex(writer)
     )
-    val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+    val eth = EthSubprotocol(
+      blockchainInfo = blockchainInfo,
+      repository = repository,
+      requestsManager = mock(EthRequestsManager::class.java)
+    )
     assertEquals(SubProtocolIdentifier.of("eth", 64), eth.id())
   }
 
@@ -67,7 +72,11 @@ class EthSubprotocolTest {
       MapKeyValueStore(),
       BlockchainIndex(writer)
     )
-    val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+    val eth = EthSubprotocol(
+      blockchainInfo = blockchainInfo,
+      repository = repository,
+      requestsManager = mock(EthRequestsManager::class.java)
+    )
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 62)))
@@ -84,7 +93,11 @@ class EthSubprotocolTest {
       MapKeyValueStore(),
       BlockchainIndex(writer)
     )
-    val eth = EthSubprotocol(blockchainInfo = blockchainInfo, repository = repository)
+    val eth = EthSubprotocol(
+      blockchainInfo = blockchainInfo,
+      repository = repository,
+      requestsManager = mock(EthRequestsManager::class.java)
+    )
     assertEquals(8, eth.versionRange(62))
     assertEquals(17, eth.versionRange(63))
     assertEquals(17, eth.versionRange(64))
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 6cb0887..73dbf32 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -97,25 +97,6 @@ class BlockchainRepository
   }
 
   /**
-   * Store all the transaction receipts of a block in the repository.
-   *
-   * Transaction receipts should be ordered by the transactions order of the block.
-   *
-   * @param transactionReceipts the transaction receipts to store
-   * @param txHash the hash of the transaction
-   * @param blockHash the hash of the block that this transaction belongs to
-   */
-  suspend fun storeTransactionReceipts(
-    vararg transactionReceipts: TransactionReceipt,
-    txHash: Bytes,
-    blockHash: Bytes
-  ) {
-    for (i in 0 until transactionReceipts.size) {
-      storeTransactionReceipt(transactionReceipts[i], i, txHash, blockHash)
-    }
-  }
-
-  /**
    * Stores a transaction receipt in the repository.
    *
    * @param transactionReceipt the transaction receipt to store
@@ -140,7 +121,7 @@ class BlockchainRepository
    * @return handle to the storage operation completion
    */
   suspend fun storeBlockHeader(header: BlockHeader) {
-    blockHeaderStore.put(header.getHash(), header.toBytes())
+    blockHeaderStore.put(header.hash, header.toBytes())
     indexBlockHeader(header)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org