You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/06 22:39:25 UTC

[incubator-tuweni] branch master updated: completing EthController, adding containsKey to KV stores

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 7afe4d2  completing EthController, adding containsKey to KV stores
7afe4d2 is described below

commit 7afe4d2d5eb036f2358f63309552f867b4189016
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jun 6 15:39:08 2020 -0700

    completing EthController, adding containsKey to KV stores
---
 .../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt  |  23 ++---
 .../org/apache/tuweni/devp2p/eth/EthClient.kt      |   4 +
 .../org/apache/tuweni/devp2p/eth/EthController.kt  |  22 +++++
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     |  11 ++-
 .../apache/tuweni/devp2p/eth/EthRequestsManager.kt |   1 +
 .../tuweni/eth/repository/BlockchainIndex.kt       | 102 +++++++++++++++------
 .../tuweni/eth/repository/BlockchainRepository.kt  |  27 +++++-
 .../apache/tuweni/kv/EntityManagerKeyValueStore.kt |  11 +++
 .../apache/tuweni/kv/InfinispanKeyValueStore.kt    |   2 +
 .../kotlin/org/apache/tuweni/kv/KeyValueStore.kt   |  16 ++++
 .../org/apache/tuweni/kv/LevelDBKeyValueStore.kt   |   2 +
 .../org/apache/tuweni/kv/MapDBKeyValueStore.kt     |   2 +
 .../org/apache/tuweni/kv/MapKeyValueStore.kt       |   2 +
 .../org/apache/tuweni/kv/ProxyKeyValueStore.kt     |   2 +
 .../org/apache/tuweni/kv/RedisKeyValueStore.kt     |   2 +
 .../org/apache/tuweni/kv/RocksDBKeyValueStore.kt   |   2 +
 .../org/apache/tuweni/kv/SQLKeyValueStore.kt       |  11 +++
 17 files changed, 195 insertions(+), 47 deletions(-)

diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index ce883cf..73d195a 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 import java.net.InetSocketAddress
 
 @ExtendWith(LuceneIndexWriterExtension::class, VertxExtension::class, BouncyCastleExtension::class)
-class EthHandlerTest {
+class ConnectToAnotherNodeTest {
 
   /**
    * To run this test, run an Ethereum mainnet node at home and point this test to it.
@@ -87,16 +87,17 @@ class EthHandlerTest {
       InetSocketAddress("192.168.88.46", 30303)
     ).await()
 
-    val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
-    client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
-
-    val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
-    Assertions.assertFalse(header.isEmpty())
-
-    val header3 = repository.findBlockByHashOrNumber(UInt256.valueOf(101L).toBytes())
-    Assertions.assertTrue(header3.isEmpty())
-    val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
-    Assertions.assertTrue(header2.isEmpty())
+//    val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
+//    client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
+//
+//    val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
+//    Assertions.assertFalse(header.isEmpty())
+//
+//    val header3 = repository.findBlockByHashOrNumber(UInt256.valueOf(101L).toBytes())
+//    Assertions.assertTrue(header3.isEmpty())
+//    val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
+//    Assertions.assertTrue(header2.isEmpty())
+    Thread.sleep(100000)
     service.stop().await()
   }
 
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
index 49fc4fb..d97bbed 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -62,6 +62,10 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
     return completion
   }
 
+  override fun requestBlockHeaders(blockHashes: List<Hash>): AsyncCompletion {
+    return AsyncCompletion.allOf(blockHashes.stream().map { requestBlockHeader(it) })
+  }
+
   override fun requestBlockHeader(blockHash: Hash): AsyncCompletion {
     val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
     val completion = AsyncCompletion.incomplete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index e593a3c..e5a0c02 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -23,9 +23,12 @@ import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
 
 class EthController(val repository: BlockchainRepository, val requestsManager: EthRequestsManager) {
 
+  var highestTotalDifficulty: UInt256 = repository.retrieveChainHeadTotalDifficulty()
+
   suspend fun findTransactionReceipts(hashes: List<Hash>): List<List<TransactionReceipt>> {
     val receipts = ArrayList<List<TransactionReceipt>>()
     hashes.forEach {
@@ -95,9 +98,21 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
 
   suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
     val handle = requestsManager.wasRequested(connectionId, headers.first()) ?: return
+    val bodiesToRequest = mutableListOf<Hash>()
+    val headersToRequest = mutableListOf<Hash>()
     headers.forEach { header ->
       repository.storeBlockHeader(header)
+      if (!repository.hasBlockBody(header.hash)) {
+        bodiesToRequest.add(header.hash)
+      }
+      header.parentHash?.let {
+        if (!repository.hasBlockHeader(it)) {
+          headersToRequest.add(it)
+        }
+      }
     }
+    requestsManager.requestBlockHeaders(headersToRequest)
+    requestsManager.requestBlockBodies(bodiesToRequest)
     handle.complete()
   }
 
@@ -109,4 +124,11 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
       }
     }
   }
+
+  suspend fun receiveStatus(connectionId: String, status: StatusMessage) {
+    println(connectionId)
+    if (!repository.hasBlockHeader(status.bestHash)) {
+      requestsManager.requestBlockHeaders(status.bestHash, 100, 5, true)
+    }
+  }
 }
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 86ec780..fd95aca 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -59,13 +59,14 @@ internal class EthHandler(
     }
   }
 
-  private fun handleStatus(connectionId: String, status: StatusMessage) {
-    if (!status.networkID.equals(blockchainInfo.networkID())) {
+  private suspend fun handleStatus(connectionId: String, status: StatusMessage) {
+    if (!status.networkID.equals(blockchainInfo.networkID()) ||
+      !status.genesisHash.equals(blockchainInfo.genesisHash())) {
       peersMap[connectionId]?.cancel()
       service.disconnect(connectionId, DisconnectReason.SUBPROTOCOL_REASON)
     }
     peersMap[connectionId]?.connect()
-    // TODO send to controller.
+    controller.receiveStatus(connectionId, status)
   }
 
 //  private fun handleReceipts(receipts: Receipts) {
@@ -99,7 +100,7 @@ internal class EthHandler(
   private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
     service.send(
       EthSubprotocol.ETH64,
-      6,
+      MessageType.BlockBodies.code,
       connectionId,
       BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
     )
@@ -125,7 +126,7 @@ internal class EthHandler(
 
   override fun handleNewPeerConnection(connectionId: String): AsyncCompletion {
     service.send(
-      EthSubprotocol.ETH64, 0, connectionId, StatusMessage(
+      EthSubprotocol.ETH64, MessageType.Status.code, connectionId, StatusMessage(
         EthSubprotocol.ETH64.version(),
         blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
         blockchainInfo.bestHash(), blockchainInfo.genesisHash(), blockchainInfo.getLatestForkHash(),
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index 3dc3181..5ea03ce 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -24,6 +24,7 @@ import org.apache.tuweni.eth.Hash
 
 interface EthRequestsManager {
   fun requestBlockHeader(blockHash: Hash): AsyncCompletion
+  fun requestBlockHeaders(blockHashes: List<Hash>): AsyncCompletion
   fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
   fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
 
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
index a4b0edd..927b25e 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
@@ -16,25 +16,6 @@
  */
 package org.apache.tuweni.eth.repository
 
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.bytes.Bytes32
-import org.apache.tuweni.eth.Address
-import org.apache.tuweni.eth.BlockHeader
-import org.apache.tuweni.eth.Hash
-import org.apache.tuweni.eth.TransactionReceipt
-import org.apache.tuweni.eth.repository.BlockHeaderFields.COINBASE
-import org.apache.tuweni.eth.repository.BlockHeaderFields.DIFFICULTY
-import org.apache.tuweni.eth.repository.BlockHeaderFields.EXTRA_DATA
-import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_LIMIT
-import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_USED
-import org.apache.tuweni.eth.repository.BlockHeaderFields.NUMBER
-import org.apache.tuweni.eth.repository.BlockHeaderFields.OMMERS_HASH
-import org.apache.tuweni.eth.repository.BlockHeaderFields.PARENT_HASH
-import org.apache.tuweni.eth.repository.BlockHeaderFields.STATE_ROOT
-import org.apache.tuweni.eth.repository.BlockHeaderFields.TIMESTAMP
-import org.apache.tuweni.eth.repository.BlockHeaderFields.TOTAL_DIFFICULTY
-import org.apache.tuweni.units.bigints.UInt256
-import org.apache.tuweni.units.ethereum.Gas
 import org.apache.lucene.document.Document
 import org.apache.lucene.document.Field
 import org.apache.lucene.document.NumericDocValuesField
@@ -54,6 +35,25 @@ import org.apache.lucene.search.SortField
 import org.apache.lucene.search.TermQuery
 import org.apache.lucene.search.TermRangeQuery
 import org.apache.lucene.util.BytesRef
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.eth.Address
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
+import org.apache.tuweni.eth.repository.BlockHeaderFields.COINBASE
+import org.apache.tuweni.eth.repository.BlockHeaderFields.DIFFICULTY
+import org.apache.tuweni.eth.repository.BlockHeaderFields.EXTRA_DATA
+import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_LIMIT
+import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_USED
+import org.apache.tuweni.eth.repository.BlockHeaderFields.NUMBER
+import org.apache.tuweni.eth.repository.BlockHeaderFields.OMMERS_HASH
+import org.apache.tuweni.eth.repository.BlockHeaderFields.PARENT_HASH
+import org.apache.tuweni.eth.repository.BlockHeaderFields.STATE_ROOT
+import org.apache.tuweni.eth.repository.BlockHeaderFields.TIMESTAMP
+import org.apache.tuweni.eth.repository.BlockHeaderFields.TOTAL_DIFFICULTY
+import org.apache.tuweni.units.bigints.UInt256
+import org.apache.tuweni.units.ethereum.Gas
 import java.io.IOException
 import java.io.UncheckedIOException
 
@@ -242,6 +242,12 @@ interface BlockchainIndexReader {
    * @return the total difficulty of the header if it could be computed.
    */
   fun totalDifficulty(hash: Bytes): UInt256?
+
+  /**
+   * Retrieves the largest total difficulty value of the chain, if it has been computed.
+   *
+   */
+  fun chainHeadTotalDifficulty(): UInt256?
 }
 
 /**
@@ -370,8 +376,10 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
 
     document += NumericDocValuesField(TransactionReceiptFields.INDEX.fieldName, txIndex.toLong())
     document += StringField(TransactionReceiptFields.TRANSACTION_HASH.fieldName, id, Field.Store.NO)
-    document += StringField(TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash),
-      Field.Store.NO)
+    document += StringField(
+      TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash),
+      Field.Store.NO
+    )
     for (log in txReceipt.getLogs()) {
       document += StringField(TransactionReceiptFields.LOGGER.fieldName, toBytesRef(log.getLogger()), Field.Store.NO)
       for (logTopic in log.getTopics()) {
@@ -381,10 +389,14 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
     txReceipt.getStateRoot()?.let {
       document += StringField(TransactionReceiptFields.STATE_ROOT.fieldName, toBytesRef(it), Field.Store.NO)
     }
-    document += StringField(TransactionReceiptFields.BLOOM_FILTER.fieldName,
-      toBytesRef(txReceipt.getBloomFilter().toBytes()), Field.Store.NO)
-    document += NumericDocValuesField(TransactionReceiptFields.CUMULATIVE_GAS_USED.fieldName,
-      txReceipt.getCumulativeGasUsed())
+    document += StringField(
+      TransactionReceiptFields.BLOOM_FILTER.fieldName,
+      toBytesRef(txReceipt.getBloomFilter().toBytes()), Field.Store.NO
+    )
+    document += NumericDocValuesField(
+      TransactionReceiptFields.CUMULATIVE_GAS_USED.fieldName,
+      txReceipt.getCumulativeGasUsed()
+    )
     txReceipt.getStatus()?.let {
       document += NumericDocValuesField(TransactionReceiptFields.STATUS.fieldName, it.toLong())
     }
@@ -402,7 +414,8 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
 
   private fun queryTxReceiptDocs(query: Query, fields: List<BlockHeaderFields>): List<Document> {
     val txQuery = BooleanQuery.Builder().add(
-      query, BooleanClause.Occur.MUST)
+      query, BooleanClause.Occur.MUST
+    )
       .add(TermQuery(Term("_type", "txReceipt")), BooleanClause.Occur.MUST).build()
 
     return search(txQuery, fields.map { it.fieldName })
@@ -432,7 +445,8 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
 
   private fun queryBlockDocs(query: Query, fields: List<BlockHeaderFields>): List<Document> {
     val blockQuery = BooleanQuery.Builder().add(
-      query, BooleanClause.Occur.MUST)
+      query, BooleanClause.Occur.MUST
+    )
       .add(TermQuery(Term("_type", "block")), BooleanClause.Occur.MUST).build()
 
     return search(blockQuery, fields.map { it.fieldName })
@@ -495,6 +509,34 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
     }
   }
 
+  override fun chainHeadTotalDifficulty(): UInt256 {
+    var searcher: IndexSearcher? = null
+    try {
+      searcher = searcherManager.acquire()
+      val topDocs = searcher!!.search(
+        TermQuery(Term("_type", "block")),
+        1,
+        Sort(SortField.FIELD_SCORE, SortField(TOTAL_DIFFICULTY.fieldName, SortField.Type.DOC, true))
+      )
+
+      if (topDocs.scoreDocs.isEmpty()) {
+        return UInt256.ZERO
+      }
+
+      val doc = searcher.doc(topDocs.scoreDocs[0].doc, setOf(TOTAL_DIFFICULTY.fieldName))
+      val fieldValue = doc.getBinaryValue(TOTAL_DIFFICULTY.fieldName)
+
+      return UInt256.fromBytes(Bytes32.wrap(fieldValue.bytes))
+    } catch (e: IOException) {
+      throw IndexReadException(e)
+    } finally {
+      try {
+        searcherManager.release(searcher)
+      } catch (e: IOException) {
+      }
+    }
+  }
+
   override fun findBy(field: BlockHeaderFields, value: Gas): List<Hash> {
     return findByOneTerm(field, toBytesRef(value))
   }
@@ -575,10 +617,12 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
       BooleanQuery.Builder()
         .add(
           TermQuery(Term(TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash))),
-          BooleanClause.Occur.MUST)
+          BooleanClause.Occur.MUST
+        )
         .add(
           NumericDocValuesField.newSlowExactQuery(TransactionReceiptFields.INDEX.fieldName, index.toLong()),
-          BooleanClause.Occur.MUST).build()
+          BooleanClause.Occur.MUST
+        ).build()
     ).firstOrNull()
   }
 
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 21b5d02..5a91036 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -23,6 +23,7 @@ import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.kv.KeyValueStore
+import org.apache.tuweni.units.bigints.UInt256
 import org.slf4j.LoggerFactory
 
 /**
@@ -125,7 +126,7 @@ class BlockchainRepository
   suspend fun storeBlockHeader(header: BlockHeader) {
     blockHeaderStore.put(header.hash, header.toBytes())
     indexBlockHeader(header)
-    logger.debug("Stored header {}", header.hash)
+    logger.debug("Stored header {} {}", header.number, header.hash)
   }
 
   private suspend fun indexBlockHeader(header: BlockHeader) {
@@ -169,6 +170,16 @@ class BlockchainRepository
   }
 
   /**
+   * Returns true if the store contains the block body.
+   *
+   * @param blockHash the hash of the block stored
+   * @return a future with a boolean result
+   */
+  suspend fun hasBlockBody(blockHash: Bytes): Boolean {
+    return blockBodyStore.containsKey(blockHash)
+  }
+
+  /**
    * Retrieves a block into the repository.
    *
    * @param blockHash the hash of the block stored
@@ -191,6 +202,16 @@ class BlockchainRepository
   }
 
   /**
+   * Returns true if the store contains the block header.
+   *
+   * @param blockHash the hash of the block stored
+   * @return a future with a boolean result
+   */
+  suspend fun hasBlockHeader(blockHash: Bytes): Boolean {
+    return blockHeaderStore.containsKey(blockHash)
+  }
+
+  /**
    * Retrieves a block header into the repository.
    *
    * @param blockHash the hash of the block stored
@@ -206,7 +227,7 @@ class BlockchainRepository
    *
    * @return the current chain head, or the genesis block if no chain head is present.
    */
-  suspend fun retrieveChainHead(): Block? {
+  suspend fun retrieveChainHead(): Block {
     return blockchainIndex.findByLargest(BlockHeaderFields.TOTAL_DIFFICULTY)
       ?.let { retrieveBlock(it) } ?: retrieveGenesisBlock()
   }
@@ -285,4 +306,6 @@ class BlockchainRepository
     return chainMetadata
       .put(GENESIS_BLOCK, block.getHeader().getHash())
   }
+
+  fun retrieveChainHeadTotalDifficulty(): UInt256 = blockchainIndex.chainHeadTotalDifficulty()
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
index f8b2905..ab66f37 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -50,6 +50,17 @@ class EntityManagerKeyValueStore<K, V>
     ) = EntityManagerKeyValueStore(entityManagerProvider::get, entityClass, idAccessor::apply)
   }
 
+  override suspend fun containsKey(key: K): Boolean {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      return em.contains(key)
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
   override suspend fun get(key: K): V? {
     val em = entityManagerProvider()
     em.transaction.begin()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 9bc0f36..03f62a9 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -43,6 +43,8 @@ class InfinispanKeyValueStore<K, V> constructor(
     fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
   }
 
+  override suspend fun containsKey(key: K): Boolean = cache.containsKeyAsync(key).await()
+
   override suspend fun get(key: K): V? = cache.getAsync(key).await()
 
   override suspend fun put(key: K, value: V) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 3f20621..5aaab72 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -29,6 +29,22 @@ import java.io.Closeable
 interface KeyValueStore<K, V> : Closeable, CoroutineScope {
 
   /**
+   * Returns true if the store contains the key.
+   *
+   * @param key The key for the content.
+   * @return true if an entry with the key exists in the store.
+   */
+  suspend fun containsKey(key: K): Boolean
+
+  /**
+   * Returns true if the store contains the key.
+   *
+   * @param key The key for the content.
+   * @return An [AsyncResult] that will complete with a boolean result.
+   */
+  suspend fun containsKeyAsync(key: K): AsyncResult<Boolean> = asyncResult { containsKey(key) }
+
+  /**
    * Retrieves data from the store.
    *
    * @param key The key for the content.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 90f0fab..9d50866 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -142,6 +142,8 @@ constructor(
     return JniDBFactory.factory.open(dbPath.toFile(), options)
   }
 
+  override suspend fun containsKey(key: K): Boolean = db[keySerializer(key).toArrayUnsafe()] != null
+
   override suspend fun get(key: K): V? {
     val rawValue = db[keySerializer(key).toArrayUnsafe()]
     return if (rawValue == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index 02908fb..3d4b509 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -110,6 +110,8 @@ constructor(
     ).createOrOpen()
   }
 
+  override suspend fun containsKey(key: K): Boolean = storageData.containsKey(keySerializer(key))
+
   override suspend fun get(key: K): V? {
     val value = storageData[keySerializer(key)]
     return if (value == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index e7f0e84..389f98b 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -53,6 +53,8 @@ constructor(
     fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
   }
 
+  override suspend fun containsKey(key: K) = map.containsKey(key)
+
   override suspend fun get(key: K): V? = map[key]
 
   override suspend fun put(key: K, value: V) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
index 9dd8bea..8c82bc8 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
@@ -57,6 +57,8 @@ class ProxyKeyValueStore<K, V, E, R>(
       ProxyKeyValueStore(store, unproxyKey::apply, proxyKey::apply, unproxyValue::apply, proxyValue::apply)
   }
 
+  override suspend fun containsKey(key: K) = store.containsKey(proxyKey(key))
+
   override suspend fun get(key: K): V? {
     val value = store.get(proxyKey(key))
     return if (value == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index b73d87f..6e98dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -212,6 +212,8 @@ class RedisKeyValueStore<K, V>(
     asyncCommands = conn.async()
   }
 
+  override suspend fun containsKey(key: K) = asyncCommands.get(keySerializer(key)) != null
+
   override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
     if (it == null) {
       null
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index e7f175e..39cb158 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -120,6 +120,8 @@ constructor(
     return RocksDB.open(options, dbPath.toAbsolutePath().toString())
   }
 
+  override suspend fun containsKey(key: K) = db[keySerializer(key).toArrayUnsafe()] != null
+
   override suspend fun get(key: K): V? {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index c9095cc..2f54d72 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -123,6 +123,17 @@ constructor(
     connectionPool = BoneCP(config)
   }
 
+  override suspend fun containsKey(key: K): Boolean {
+    connectionPool.asyncConnection.await().use {
+      val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
+      stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+      stmt.execute()
+
+      val rs = stmt.resultSet
+      return rs.next()
+    }
+  }
+
   override suspend fun get(key: K): V? {
       connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org