You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/07 06:25:45 UTC

[incubator-tuweni] branch master updated: Implement all eth messages

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 722519c  Implement all eth messages
722519c is described below

commit 722519c420d443f0f777c3f0f88da6f4a73bdcf1
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jun 6 23:25:27 2020 -0700

    Implement all eth messages
---
 .../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt  |  3 +
 .../org/apache/tuweni/devp2p/eth/EthClient.kt      | 71 ++++++++++++++++------
 .../org/apache/tuweni/devp2p/eth/EthController.kt  | 46 +++++++++++++-
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     | 31 +++++++---
 .../apache/tuweni/devp2p/eth/EthRequestsManager.kt | 16 +++--
 .../org/apache/tuweni/devp2p/eth/Messages.kt       | 48 ++++++++++++++-
 .../tuweni/eth/repository/BlockchainIndex.kt       | 13 ++++
 .../tuweni/eth/repository/BlockchainRepository.kt  | 42 +++++++++++--
 8 files changed, 228 insertions(+), 42 deletions(-)

diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index 73d195a..11ea363 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -58,6 +58,7 @@ class ConnectToAnotherNodeTest {
       MapKeyValueStore(),
       MapKeyValueStore(),
       MapKeyValueStore(),
+      MapKeyValueStore(),
       BlockchainIndex(writer),
       genesisBlock
     )
@@ -111,6 +112,7 @@ class ConnectToAnotherNodeTest {
       MapKeyValueStore(),
       MapKeyValueStore(),
       MapKeyValueStore(),
+      MapKeyValueStore(),
       BlockchainIndex(writer),
       genesisBlock
     )
@@ -140,6 +142,7 @@ class ConnectToAnotherNodeTest {
       MapKeyValueStore(),
       MapKeyValueStore(),
       MapKeyValueStore(),
+      MapKeyValueStore(),
       BlockchainIndex(writer),
       genesisBlock
     )
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
index d97bbed..8894375 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -16,18 +16,45 @@
  */
 package org.apache.tuweni.devp2p.eth
 
+import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.bytes.Bytes32
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.SubProtocolClient
 import org.apache.tuweni.units.bigints.UInt256
 
 class EthClient(private val service: RLPxService) : EthRequestsManager, SubProtocolClient {
 
+  private val headerRequests = HashMap<Bytes32, Request>()
+  private val bodiesRequests = HashMap<String, Request>()
+  private val nodeDataRequests = HashMap<String, Request>()
+  private val transactionReceiptRequests = HashMap<String, Request>()
+
+  override fun requestTransactionReceipts(blockHashes: List<Hash>): AsyncCompletion {
+    val conns = service.repository().asIterable(EthSubprotocol.ETH62)
+    val handle = AsyncCompletion.incomplete()
+    conns.forEach { conn ->
+      var done = false
+      transactionReceiptRequests.computeIfAbsent(conn.id()) {
+        service.send(
+          EthSubprotocol.ETH62,
+          MessageType.GetReceipts.code,
+          conn.id(),
+          GetReceipts(blockHashes).toBytes()
+        )
+        done = true
+        Request(conn.id(), handle, blockHashes)
+      }
+      return handle
+    }
+    throw RuntimeException("No connection available")
+  }
+
   override fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
     val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
     val completion = AsyncCompletion.incomplete()
@@ -38,14 +65,11 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
         conn!!.id(),
         GetBlockHeaders(blockHash, maxHeaders, skip, reverse).toBytes()
       )
-      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+      Request(connectionId = conn.id(), handle = completion, data = blockHash)
     }
     return completion
   }
 
-  private val headerRequests = HashMap<Bytes32, BlockHeaderRequest>()
-  private val bodiesRequests = HashMap<String, List<Hash>>()
-
   override fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion {
     val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
     val blockNumberBytes = UInt256.valueOf(blockNumber).toBytes()
@@ -57,7 +81,7 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
         conn!!.id(),
         GetBlockHeaders(blockNumberBytes, maxHeaders, skip, reverse).toBytes()
       )
-      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+      Request(connectionId = conn.id(), handle = completion, data = blockNumber)
     }
     return completion
   }
@@ -76,32 +100,37 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
         conn!!.id(),
         GetBlockHeaders(blockHash, 1, 0, false).toBytes()
       )
-      BlockHeaderRequest(connectionId = conn.id(), handle = completion)
+      Request(connectionId = conn.id(), handle = completion, data = blockHash)
     }
     return completion
   }
 
-  override fun requestBlockBodies(blockHashes: List<Hash>) {
+  override fun requestBlockBodies(blockHashes: List<Hash>): AsyncCompletion {
     val conns = service.repository().asIterable(EthSubprotocol.ETH62)
+    val handle = AsyncCompletion.incomplete()
     conns.forEach { conn ->
-      if (bodiesRequests.computeIfAbsent(conn.id()) {
+      var done = false
+      bodiesRequests.computeIfAbsent(conn.id()) {
         service.send(
           EthSubprotocol.ETH62,
           MessageType.GetBlockBodies.code,
           conn.id(),
           GetBlockBodies(blockHashes).toBytes()
         )
-        blockHashes
-      } == blockHashes) {
-        return
+        done = true
+        Request(conn.id(), handle, blockHashes)
+      }
+      if (done) {
+        return handle
       }
     }
+    throw RuntimeException("No connection available")
   }
 
-  override fun requestBlock(blockHash: Hash) {
-    requestBlockHeader(blockHash)
+  override fun requestBlock(blockHash: Hash): AsyncCompletion = AsyncCompletion.allOf(
+    requestBlockHeader(blockHash),
     requestBlockBodies(listOf(blockHash))
-  }
+  )
 
   override fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion? {
     val request = headerRequests.remove(header.hash) ?: return null
@@ -112,8 +141,16 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
     }
   }
 
-  override fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>? =
-    bodiesRequests.get(connectionId)
+  override fun wasRequested(connectionId: String, bodies: List<BlockBody>): Request? =
+    bodiesRequests[connectionId]
+
+  override fun nodeDataWasRequested(connectionId: String, elements: List<Bytes?>): Request? =
+    nodeDataRequests[connectionId]
+
+  override fun transactionRequestsWasRequested(
+    connectionId: String,
+    transactionReceipts: List<List<TransactionReceipt>>
+  ): Request? = transactionReceiptRequests[connectionId]
 }
 
-internal data class BlockHeaderRequest(val connectionId: String, val handle: CompletableAsyncCompletion)
+data class Request(val connectionId: String, val handle: CompletableAsyncCompletion, val data: Any)
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index e5a0c02..243608c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -21,6 +21,7 @@ import org.apache.tuweni.eth.Block
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.Transaction
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import org.apache.tuweni.units.bigints.UInt256
@@ -117,8 +118,9 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
   }
 
   suspend fun addNewBlockBodies(connectionId: String, bodies: List<BlockBody>) {
-    val hashes = requestsManager.wasRequested(connectionId, bodies)
-    if (hashes != null) {
+    val request = requestsManager.wasRequested(connectionId, bodies)
+    if (request != null) {
+      val hashes = request.data as List<Hash>
       for (i in 0..hashes.size) {
         repository.storeBlockBody(hashes[i], bodies[i])
       }
@@ -131,4 +133,44 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
       requestsManager.requestBlockHeaders(status.bestHash, 100, 5, true)
     }
   }
+
+  suspend fun findNodeData(hashes: List<Hash>) = repository.retrieveNodeData(hashes)
+
+  suspend fun addNewNodeData(connectionId: String, elements: List<Bytes?>) {
+    val request = requestsManager.nodeDataWasRequested(connectionId, elements)
+    if (request != null) {
+      val hashes = request.data as List<Hash>
+      for (i in 0..hashes.size) {
+        val elt = elements[i]
+        if (elt != null) {
+          repository.storeNodeData(hashes[i], elt)
+        }
+      }
+    }
+  }
+
+  suspend fun addNewTransactionReceipts(connectionId: String, transactionReceipts: List<List<TransactionReceipt>>) {
+    val request = requestsManager.transactionRequestsWasRequested(connectionId, transactionReceipts)
+    if (request != null) {
+      val hashes = request.data as List<Hash>
+      for (i in 0..hashes.size) {
+        val blockBody = repository.retrieveBlockBody(hashes[i])
+        val blockReceipts = transactionReceipts[i]
+        for (j in 0..blockReceipts.size) {
+          repository.storeTransactionReceipt(
+            blockReceipts[j],
+            j,
+            blockBody?.transactions?.get(j)?.hash ?: Bytes.EMPTY,
+            hashes[i]
+          )
+        }
+      }
+    }
+  }
+
+  suspend fun addNewTransactions(transactions: List<Transaction>) {
+    transactions.forEach {
+      repository.storeTransaction(it)
+    }
+  }
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index fd95aca..c1810f4 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -46,19 +46,27 @@ internal class EthHandler(
     when (messageType) {
       MessageType.Status.code -> handleStatus(connectionId, StatusMessage.read(message))
       MessageType.NewBlockHashes.code -> handleNewBlockHashes(NewBlockHashes.read(message))
-//    Transactions.code -> // do nothing.
+      MessageType.Transactions.code -> handleTransactions(Transactions.read(message))
       MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connectionId, GetBlockHeaders.read(message))
       MessageType.BlockHeaders.code -> handleHeaders(connectionId, BlockHeaders.read(message))
       MessageType.GetBlockBodies.code -> handleGetBlockBodies(connectionId, GetBlockBodies.read(message))
       MessageType.BlockBodies.code -> handleBlockBodies(connectionId, BlockBodies.read(message))
       MessageType.NewBlock.code -> handleNewBlock(NewBlock.read(message))
       MessageType.GetNodeData.code -> handleGetNodeData(connectionId, GetNodeData.read(message))
-//    MessageType.NodeData.code-> // not implemented yet.
+      MessageType.NodeData.code -> handleNodeData(connectionId, NodeData.read(message))
       MessageType.GetReceipts.code -> handleGetReceipts(connectionId, GetReceipts.read(message))
-      // MessageType.Receipts.code -> handleReceipts(Receipts.read(message)) // not implemented yet
+      MessageType.Receipts.code -> handleReceipts(connectionId, Receipts.read(message))
     }
   }
 
+  private suspend fun handleTransactions(transactions: Transactions) {
+    controller.addNewTransactions(transactions.transactions)
+  }
+
+  private suspend fun handleNodeData(connectionId: String, read: NodeData) {
+    controller.addNewNodeData(connectionId, read.elements)
+  }
+
   private suspend fun handleStatus(connectionId: String, status: StatusMessage) {
     if (!status.networkID.equals(blockchainInfo.networkID()) ||
       !status.genesisHash.equals(blockchainInfo.genesisHash())) {
@@ -69,9 +77,9 @@ internal class EthHandler(
     controller.receiveStatus(connectionId, status)
   }
 
-//  private fun handleReceipts(receipts: Receipts) {
-//    repository.storeTransactionReceipts()
-//  }
+  private suspend fun handleReceipts(connectionId: String, receipts: Receipts) {
+    controller.addNewTransactionReceipts(connectionId, receipts.transactionReceipts)
+  }
 
   private suspend fun handleGetReceipts(connectionId: String, getReceipts: GetReceipts) {
 
@@ -83,10 +91,13 @@ internal class EthHandler(
     )
   }
 
-  private fun handleGetNodeData(connectionId: String, nodeData: GetNodeData) {
-    // TODO implement
-    nodeData.toBytes()
-    service.send(EthSubprotocol.ETH64, MessageType.NodeData.code, connectionId, NodeData(emptyList()).toBytes())
+  private suspend fun handleGetNodeData(connectionId: String, nodeData: GetNodeData) {
+    service.send(
+      EthSubprotocol.ETH64,
+      MessageType.NodeData.code,
+      connectionId,
+      NodeData(controller.findNodeData(nodeData.hashes)).toBytes()
+    )
   }
 
   private suspend fun handleNewBlock(read: NewBlock) {
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index 5ea03ce..34f5847 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -16,22 +16,28 @@
  */
 package org.apache.tuweni.devp2p.eth
 
+import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
 
 interface EthRequestsManager {
   fun requestBlockHeader(blockHash: Hash): AsyncCompletion
   fun requestBlockHeaders(blockHashes: List<Hash>): AsyncCompletion
   fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
   fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
-
-  fun requestBlockBodies(blockHashes: List<Hash>)
-
-  fun requestBlock(blockHash: Hash)
+  fun requestBlockBodies(blockHashes: List<Hash>): AsyncCompletion
+  fun requestBlock(blockHash: Hash): AsyncCompletion
+  fun requestTransactionReceipts(blockHashes: List<Hash>): AsyncCompletion
 
   fun wasRequested(connectionId: String, header: BlockHeader): CompletableAsyncCompletion?
-  fun wasRequested(connectionId: String, bodies: List<BlockBody>): List<Hash>?
+  fun wasRequested(connectionId: String, bodies: List<BlockBody>): Request?
+  fun nodeDataWasRequested(connectionId: String, elements: List<Bytes?>): Request?
+  fun transactionRequestsWasRequested(
+    connectionId: String,
+    transactionReceipts: List<List<TransactionReceipt>>
+  ): Request?
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/Messages.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/Messages.kt
index 356b043..47a526c 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/Messages.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/Messages.kt
@@ -21,6 +21,7 @@ import org.apache.tuweni.eth.Block
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.Transaction
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.units.bigints.UInt256
@@ -228,8 +229,31 @@ data class GetNodeData(val hashes: List<Hash>) {
   }
 }
 
-data class NodeData(val elements: List<Any>) {
-  fun toBytes(): Bytes = RLP.encodeList { _ ->
+data class NodeData(val elements: List<Bytes?>) {
+  companion object {
+
+    fun read(payload: Bytes): NodeData = RLP.decodeList(payload) {
+      val elements = ArrayList<Bytes?>()
+      while (!it.isComplete) {
+        val value = it.readValue()
+        if (value == Bytes.EMPTY) {
+          elements.add(null)
+        } else {
+          elements.add(value)
+        }
+      }
+      NodeData(elements)
+    }
+  }
+
+  fun toBytes(): Bytes = RLP.encodeList { writer ->
+    elements.forEach {
+      if (it == null) {
+        writer.writeValue(Bytes.EMPTY)
+      } else {
+        writer.writeValue(it)
+      }
+    }
   }
 }
 
@@ -280,3 +304,23 @@ data class Receipts(val transactionReceipts: List<List<TransactionReceipt>>) {
     }
   }
 }
+
+data class Transactions(val transactions: List<Transaction>) {
+  companion object {
+
+    fun read(payload: Bytes): Transactions = RLP.decodeList(payload) {
+      val transactions = ArrayList<Transaction>()
+      while (!it.isComplete) {
+        val tx = Transaction.readFrom(it)
+        transactions.add(tx)
+      }
+      Transactions(transactions)
+    }
+  }
+
+  fun toBytes(): Bytes = RLP.encodeList { writer ->
+    transactions.forEach {
+      it.writeTo(writer)
+    }
+  }
+}
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
index 927b25e..cc96578 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
@@ -40,6 +40,7 @@ import org.apache.tuweni.bytes.Bytes32
 import org.apache.tuweni.eth.Address
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.Transaction
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.eth.repository.BlockHeaderFields.COINBASE
 import org.apache.tuweni.eth.repository.BlockHeaderFields.DIFFICULTY
@@ -271,6 +272,13 @@ interface BlockchainIndexWriter {
    * @param blockHash the hash of the block
    */
   fun indexTransactionReceipt(txReceipt: TransactionReceipt, txIndex: Int, txHash: Bytes, blockHash: Bytes)
+
+  /**
+   * Indexes a transaction.
+   *
+   * @param transaction the transaction to index
+   */
+  fun indexTransaction(transaction: Transaction)
 }
 
 /**
@@ -287,6 +295,7 @@ internal class IndexWriteException(e: Exception) : RuntimeException(e)
  * A Lucene-backed indexer capable of indexing blocks and block headers.
  */
 class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWriter, BlockchainIndexReader {
+
   private val searcherManager: SearcherManager
 
   init {
@@ -368,6 +377,10 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
     }
   }
 
+  override fun indexTransaction(transaction: Transaction) {
+    // TODO
+  }
+
   override fun indexTransactionReceipt(txReceipt: TransactionReceipt, txIndex: Int, txHash: Bytes, blockHash: Bytes) {
     val document = mutableListOf<IndexableField>()
     val id = toBytesRef(txHash)
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 5a91036..1a394d2 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -21,6 +21,7 @@ import org.apache.tuweni.eth.Block
 import org.apache.tuweni.eth.BlockBody
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.Transaction
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.kv.KeyValueStore
 import org.apache.tuweni.units.bigints.UInt256
@@ -39,12 +40,17 @@ class BlockchainRepository
  * @param chainMetadata the key-value store to store chain metadata
  * @param blockBodyStore the key-value store to store block bodies
  * @param blockHeaderStore the key-value store to store block headers
+ * @param transactionReceiptStore the key-value store to store transaction receipts
+ * @param transactionStore the key-value store to store transactions
+ * @param stateStore the key-value store to store state
  * @param blockchainIndex the blockchain index to index values
  */(
    private val chainMetadata: KeyValueStore<Bytes, Bytes>,
    private val blockBodyStore: KeyValueStore<Bytes, Bytes>,
    private val blockHeaderStore: KeyValueStore<Bytes, Bytes>,
-   private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
+   private val transactionReceiptStore: KeyValueStore<Bytes, Bytes>,
+   private val transactionStore: KeyValueStore<Bytes, Bytes>,
+   private val stateStore: KeyValueStore<Bytes, Bytes>,
    private val blockchainIndex: BlockchainIndex
  ) {
 
@@ -63,6 +69,8 @@ class BlockchainRepository
       blockHeaderStore: KeyValueStore<Bytes, Bytes>,
       chainMetadata: KeyValueStore<Bytes, Bytes>,
       transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
+      transactionStore: KeyValueStore<Bytes, Bytes>,
+      stateStore: KeyValueStore<Bytes, Bytes>,
       blockchainIndex: BlockchainIndex,
       genesisBlock: Block
     ): BlockchainRepository {
@@ -70,6 +78,8 @@ class BlockchainRepository
         blockBodyStore,
         blockHeaderStore,
         transactionReceiptsStore,
+        transactionStore,
+        stateStore,
         blockchainIndex)
       repo.setGenesisBlock(genesisBlock)
       repo.storeBlock(genesisBlock)
@@ -81,13 +91,22 @@ class BlockchainRepository
    * Stores a block body into the repository.
    *
    * @param blockBody the block body to store
-   * @return a handle to the storage operation completion
    */
   suspend fun storeBlockBody(blockHash: Hash, blockBody: BlockBody) {
     blockBodyStore.put(blockHash, blockBody.toBytes())
   }
 
   /**
+   * Stores state node data into the repository.
+   *
+   * @param bytes the node data to store
+   * @return a handle to the storage operation completion
+   */
+  suspend fun storeNodeData(hash: Hash, bytes: Bytes) {
+    stateStore.put(hash, bytes)
+  }
+
+  /**
    * Stores a block into the repository.
    *
    * @param block the block to store
@@ -113,7 +132,7 @@ class BlockchainRepository
     txHash: Bytes,
     blockHash: Bytes
   ) {
-    transactionReceiptsStore.put(txHash, transactionReceipt.toBytes())
+    transactionReceiptStore.put(txHash, transactionReceipt.toBytes())
     indexTransactionReceipt(transactionReceipt, txIndex, txHash, blockHash)
   }
 
@@ -259,7 +278,7 @@ class BlockchainRepository
    */
   suspend fun retrieveTransactionReceipts(blockHash: Bytes): List<TransactionReceipt> {
     return blockchainIndex.findBy(TransactionReceiptFields.BLOCK_HASH, blockHash).mapNotNull {
-      transactionReceiptsStore.get(it)?.let { TransactionReceipt.fromBytes(it) }
+      transactionReceiptStore.get(it)?.let { TransactionReceipt.fromBytes(it) }
     }
   }
 
@@ -270,7 +289,7 @@ class BlockchainRepository
    */
   suspend fun retrieveTransactionReceipt(blockHash: Bytes, index: Int): TransactionReceipt? {
     return blockchainIndex.findByBlockHashAndIndex(blockHash, index)?.let {
-      transactionReceiptsStore.get(it)?.let { TransactionReceipt.fromBytes(it) }
+      transactionReceiptStore.get(it)?.let { TransactionReceipt.fromBytes(it) }
     }
   }
 
@@ -279,7 +298,7 @@ class BlockchainRepository
    * @param txHash the hash of the transaction
    */
   suspend fun retrieveTransactionReceipt(txHash: Hash): TransactionReceipt? {
-    return transactionReceiptsStore.get(txHash)?.let { TransactionReceipt.fromBytes(it) }
+    return transactionReceiptStore.get(txHash)?.let { TransactionReceipt.fromBytes(it) }
   }
 
   /**
@@ -308,4 +327,15 @@ class BlockchainRepository
   }
 
   fun retrieveChainHeadTotalDifficulty(): UInt256 = blockchainIndex.chainHeadTotalDifficulty()
+
+  suspend fun retrieveNodeData(hashes: List<Hash>): List<Bytes?> {
+    return hashes.map {
+      stateStore.get(it)
+    }
+  }
+
+  suspend fun storeTransaction(transaction: Transaction) {
+    transactionStore.put(transaction.hash, transaction.toBytes())
+    blockchainIndex.indexTransaction(transaction)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org