You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/06 22:39:25 UTC
[incubator-tuweni] branch master updated: completing EthController,
adding containsKey to KV stores
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 7afe4d2 completing EthController, adding containsKey to KV stores
7afe4d2 is described below
commit 7afe4d2d5eb036f2358f63309552f867b4189016
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jun 6 15:39:08 2020 -0700
completing EthController, adding containsKey to KV stores
---
.../tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt | 23 ++---
.../org/apache/tuweni/devp2p/eth/EthClient.kt | 4 +
.../org/apache/tuweni/devp2p/eth/EthController.kt | 22 +++++
.../org/apache/tuweni/devp2p/eth/EthHandler.kt | 11 ++-
.../apache/tuweni/devp2p/eth/EthRequestsManager.kt | 1 +
.../tuweni/eth/repository/BlockchainIndex.kt | 102 +++++++++++++++------
.../tuweni/eth/repository/BlockchainRepository.kt | 27 +++++-
.../apache/tuweni/kv/EntityManagerKeyValueStore.kt | 11 +++
.../apache/tuweni/kv/InfinispanKeyValueStore.kt | 2 +
.../kotlin/org/apache/tuweni/kv/KeyValueStore.kt | 16 ++++
.../org/apache/tuweni/kv/LevelDBKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/MapDBKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/MapKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/ProxyKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/RedisKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/RocksDBKeyValueStore.kt | 2 +
.../org/apache/tuweni/kv/SQLKeyValueStore.kt | 11 +++
17 files changed, 195 insertions(+), 47 deletions(-)
diff --git a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
index ce883cf..73d195a 100644
--- a/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
+++ b/devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt
@@ -40,7 +40,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetSocketAddress
@ExtendWith(LuceneIndexWriterExtension::class, VertxExtension::class, BouncyCastleExtension::class)
-class EthHandlerTest {
+class ConnectToAnotherNodeTest {
/**
* To run this test, run an Ethereum mainnet node at home and point this test to it.
@@ -87,16 +87,17 @@ class EthHandlerTest {
InetSocketAddress("192.168.88.46", 30303)
).await()
- val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
- client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
-
- val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
- Assertions.assertFalse(header.isEmpty())
-
- val header3 = repository.findBlockByHashOrNumber(UInt256.valueOf(101L).toBytes())
- Assertions.assertTrue(header3.isEmpty())
- val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
- Assertions.assertTrue(header2.isEmpty())
+// val client = service.getClient(EthSubprotocol.ETH62) as EthRequestsManager
+// client.requestBlockHeaders(genesisBlock.header.hash, 100, 0, false).await()
+//
+// val header = repository.findBlockByHashOrNumber(UInt256.valueOf(99L).toBytes())
+// Assertions.assertFalse(header.isEmpty())
+//
+// val header3 = repository.findBlockByHashOrNumber(UInt256.valueOf(101L).toBytes())
+// Assertions.assertTrue(header3.isEmpty())
+// val header2 = repository.findBlockByHashOrNumber(UInt256.valueOf(100L).toBytes())
+// Assertions.assertTrue(header2.isEmpty())
+ Thread.sleep(100000)
service.stop().await()
}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
index 49fc4fb..d97bbed 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -62,6 +62,10 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
return completion
}
+ override fun requestBlockHeaders(blockHashes: List<Hash>): AsyncCompletion {
+ return AsyncCompletion.allOf(blockHashes.stream().map { requestBlockHeader(it) })
+ }
+
override fun requestBlockHeader(blockHash: Hash): AsyncCompletion {
val conn = service.repository().asIterable(EthSubprotocol.ETH62).firstOrNull()
val completion = AsyncCompletion.incomplete()
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index e593a3c..e5a0c02 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -23,9 +23,12 @@ import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
class EthController(val repository: BlockchainRepository, val requestsManager: EthRequestsManager) {
+ var highestTotalDifficulty: UInt256 = repository.retrieveChainHeadTotalDifficulty()
+
suspend fun findTransactionReceipts(hashes: List<Hash>): List<List<TransactionReceipt>> {
val receipts = ArrayList<List<TransactionReceipt>>()
hashes.forEach {
@@ -95,9 +98,21 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
suspend fun addNewBlockHeaders(connectionId: String, headers: List<BlockHeader>) {
val handle = requestsManager.wasRequested(connectionId, headers.first()) ?: return
+ val bodiesToRequest = mutableListOf<Hash>()
+ val headersToRequest = mutableListOf<Hash>()
headers.forEach { header ->
repository.storeBlockHeader(header)
+ if (!repository.hasBlockBody(header.hash)) {
+ bodiesToRequest.add(header.hash)
+ }
+ header.parentHash?.let {
+ if (!repository.hasBlockHeader(it)) {
+ headersToRequest.add(it)
+ }
+ }
}
+ requestsManager.requestBlockHeaders(headersToRequest)
+ requestsManager.requestBlockBodies(bodiesToRequest)
handle.complete()
}
@@ -109,4 +124,11 @@ class EthController(val repository: BlockchainRepository, val requestsManager: E
}
}
}
+
+ suspend fun receiveStatus(connectionId: String, status: StatusMessage) {
+ println(connectionId)
+ if (!repository.hasBlockHeader(status.bestHash)) {
+ requestsManager.requestBlockHeaders(status.bestHash, 100, 5, true)
+ }
+ }
}
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 86ec780..fd95aca 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -59,13 +59,14 @@ internal class EthHandler(
}
}
- private fun handleStatus(connectionId: String, status: StatusMessage) {
- if (!status.networkID.equals(blockchainInfo.networkID())) {
+ private suspend fun handleStatus(connectionId: String, status: StatusMessage) {
+ if (!status.networkID.equals(blockchainInfo.networkID()) ||
+ !status.genesisHash.equals(blockchainInfo.genesisHash())) {
peersMap[connectionId]?.cancel()
service.disconnect(connectionId, DisconnectReason.SUBPROTOCOL_REASON)
}
peersMap[connectionId]?.connect()
- // TODO send to controller.
+ controller.receiveStatus(connectionId, status)
}
// private fun handleReceipts(receipts: Receipts) {
@@ -99,7 +100,7 @@ internal class EthHandler(
private suspend fun handleGetBlockBodies(connectionId: String, message: GetBlockBodies) {
service.send(
EthSubprotocol.ETH64,
- 6,
+ MessageType.BlockBodies.code,
connectionId,
BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
)
@@ -125,7 +126,7 @@ internal class EthHandler(
override fun handleNewPeerConnection(connectionId: String): AsyncCompletion {
service.send(
- EthSubprotocol.ETH64, 0, connectionId, StatusMessage(
+ EthSubprotocol.ETH64, MessageType.Status.code, connectionId, StatusMessage(
EthSubprotocol.ETH64.version(),
blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
blockchainInfo.bestHash(), blockchainInfo.genesisHash(), blockchainInfo.getLatestForkHash(),
diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index 3dc3181..5ea03ce 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -24,6 +24,7 @@ import org.apache.tuweni.eth.Hash
interface EthRequestsManager {
fun requestBlockHeader(blockHash: Hash): AsyncCompletion
+ fun requestBlockHeaders(blockHashes: List<Hash>): AsyncCompletion
fun requestBlockHeaders(blockHash: Hash, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
fun requestBlockHeaders(blockNumber: Long, maxHeaders: Long, skip: Long, reverse: Boolean): AsyncCompletion
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
index a4b0edd..927b25e 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainIndex.kt
@@ -16,25 +16,6 @@
*/
package org.apache.tuweni.eth.repository
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.bytes.Bytes32
-import org.apache.tuweni.eth.Address
-import org.apache.tuweni.eth.BlockHeader
-import org.apache.tuweni.eth.Hash
-import org.apache.tuweni.eth.TransactionReceipt
-import org.apache.tuweni.eth.repository.BlockHeaderFields.COINBASE
-import org.apache.tuweni.eth.repository.BlockHeaderFields.DIFFICULTY
-import org.apache.tuweni.eth.repository.BlockHeaderFields.EXTRA_DATA
-import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_LIMIT
-import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_USED
-import org.apache.tuweni.eth.repository.BlockHeaderFields.NUMBER
-import org.apache.tuweni.eth.repository.BlockHeaderFields.OMMERS_HASH
-import org.apache.tuweni.eth.repository.BlockHeaderFields.PARENT_HASH
-import org.apache.tuweni.eth.repository.BlockHeaderFields.STATE_ROOT
-import org.apache.tuweni.eth.repository.BlockHeaderFields.TIMESTAMP
-import org.apache.tuweni.eth.repository.BlockHeaderFields.TOTAL_DIFFICULTY
-import org.apache.tuweni.units.bigints.UInt256
-import org.apache.tuweni.units.ethereum.Gas
import org.apache.lucene.document.Document
import org.apache.lucene.document.Field
import org.apache.lucene.document.NumericDocValuesField
@@ -54,6 +35,25 @@ import org.apache.lucene.search.SortField
import org.apache.lucene.search.TermQuery
import org.apache.lucene.search.TermRangeQuery
import org.apache.lucene.util.BytesRef
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.eth.Address
+import org.apache.tuweni.eth.BlockHeader
+import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.eth.TransactionReceipt
+import org.apache.tuweni.eth.repository.BlockHeaderFields.COINBASE
+import org.apache.tuweni.eth.repository.BlockHeaderFields.DIFFICULTY
+import org.apache.tuweni.eth.repository.BlockHeaderFields.EXTRA_DATA
+import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_LIMIT
+import org.apache.tuweni.eth.repository.BlockHeaderFields.GAS_USED
+import org.apache.tuweni.eth.repository.BlockHeaderFields.NUMBER
+import org.apache.tuweni.eth.repository.BlockHeaderFields.OMMERS_HASH
+import org.apache.tuweni.eth.repository.BlockHeaderFields.PARENT_HASH
+import org.apache.tuweni.eth.repository.BlockHeaderFields.STATE_ROOT
+import org.apache.tuweni.eth.repository.BlockHeaderFields.TIMESTAMP
+import org.apache.tuweni.eth.repository.BlockHeaderFields.TOTAL_DIFFICULTY
+import org.apache.tuweni.units.bigints.UInt256
+import org.apache.tuweni.units.ethereum.Gas
import java.io.IOException
import java.io.UncheckedIOException
@@ -242,6 +242,12 @@ interface BlockchainIndexReader {
* @return the total difficulty of the header if it could be computed.
*/
fun totalDifficulty(hash: Bytes): UInt256?
+
+ /**
+ * Retrieves the largest total difficulty value of the chain, if it has been computed.
+ *
+ */
+ fun chainHeadTotalDifficulty(): UInt256?
}
/**
@@ -370,8 +376,10 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
document += NumericDocValuesField(TransactionReceiptFields.INDEX.fieldName, txIndex.toLong())
document += StringField(TransactionReceiptFields.TRANSACTION_HASH.fieldName, id, Field.Store.NO)
- document += StringField(TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash),
- Field.Store.NO)
+ document += StringField(
+ TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash),
+ Field.Store.NO
+ )
for (log in txReceipt.getLogs()) {
document += StringField(TransactionReceiptFields.LOGGER.fieldName, toBytesRef(log.getLogger()), Field.Store.NO)
for (logTopic in log.getTopics()) {
@@ -381,10 +389,14 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
txReceipt.getStateRoot()?.let {
document += StringField(TransactionReceiptFields.STATE_ROOT.fieldName, toBytesRef(it), Field.Store.NO)
}
- document += StringField(TransactionReceiptFields.BLOOM_FILTER.fieldName,
- toBytesRef(txReceipt.getBloomFilter().toBytes()), Field.Store.NO)
- document += NumericDocValuesField(TransactionReceiptFields.CUMULATIVE_GAS_USED.fieldName,
- txReceipt.getCumulativeGasUsed())
+ document += StringField(
+ TransactionReceiptFields.BLOOM_FILTER.fieldName,
+ toBytesRef(txReceipt.getBloomFilter().toBytes()), Field.Store.NO
+ )
+ document += NumericDocValuesField(
+ TransactionReceiptFields.CUMULATIVE_GAS_USED.fieldName,
+ txReceipt.getCumulativeGasUsed()
+ )
txReceipt.getStatus()?.let {
document += NumericDocValuesField(TransactionReceiptFields.STATUS.fieldName, it.toLong())
}
@@ -402,7 +414,8 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
private fun queryTxReceiptDocs(query: Query, fields: List<BlockHeaderFields>): List<Document> {
val txQuery = BooleanQuery.Builder().add(
- query, BooleanClause.Occur.MUST)
+ query, BooleanClause.Occur.MUST
+ )
.add(TermQuery(Term("_type", "txReceipt")), BooleanClause.Occur.MUST).build()
return search(txQuery, fields.map { it.fieldName })
@@ -432,7 +445,8 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
private fun queryBlockDocs(query: Query, fields: List<BlockHeaderFields>): List<Document> {
val blockQuery = BooleanQuery.Builder().add(
- query, BooleanClause.Occur.MUST)
+ query, BooleanClause.Occur.MUST
+ )
.add(TermQuery(Term("_type", "block")), BooleanClause.Occur.MUST).build()
return search(blockQuery, fields.map { it.fieldName })
@@ -495,6 +509,34 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
}
}
+ override fun chainHeadTotalDifficulty(): UInt256 {
+ var searcher: IndexSearcher? = null
+ try {
+ searcher = searcherManager.acquire()
+ val topDocs = searcher!!.search(
+ TermQuery(Term("_type", "block")),
+ 1,
+ Sort(SortField.FIELD_SCORE, SortField(TOTAL_DIFFICULTY.fieldName, SortField.Type.DOC, true))
+ )
+
+ if (topDocs.scoreDocs.isEmpty()) {
+ return UInt256.ZERO
+ }
+
+ val doc = searcher.doc(topDocs.scoreDocs[0].doc, setOf(TOTAL_DIFFICULTY.fieldName))
+ val fieldValue = doc.getBinaryValue(TOTAL_DIFFICULTY.fieldName)
+
+ return UInt256.fromBytes(Bytes32.wrap(fieldValue.bytes))
+ } catch (e: IOException) {
+ throw IndexReadException(e)
+ } finally {
+ try {
+ searcherManager.release(searcher)
+ } catch (e: IOException) {
+ }
+ }
+ }
+
override fun findBy(field: BlockHeaderFields, value: Gas): List<Hash> {
return findByOneTerm(field, toBytesRef(value))
}
@@ -575,10 +617,12 @@ class BlockchainIndex(private val indexWriter: IndexWriter) : BlockchainIndexWri
BooleanQuery.Builder()
.add(
TermQuery(Term(TransactionReceiptFields.BLOCK_HASH.fieldName, toBytesRef(blockHash))),
- BooleanClause.Occur.MUST)
+ BooleanClause.Occur.MUST
+ )
.add(
NumericDocValuesField.newSlowExactQuery(TransactionReceiptFields.INDEX.fieldName, index.toLong()),
- BooleanClause.Occur.MUST).build()
+ BooleanClause.Occur.MUST
+ ).build()
).firstOrNull()
}
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index 21b5d02..5a91036 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -23,6 +23,7 @@ import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.kv.KeyValueStore
+import org.apache.tuweni.units.bigints.UInt256
import org.slf4j.LoggerFactory
/**
@@ -125,7 +126,7 @@ class BlockchainRepository
suspend fun storeBlockHeader(header: BlockHeader) {
blockHeaderStore.put(header.hash, header.toBytes())
indexBlockHeader(header)
- logger.debug("Stored header {}", header.hash)
+ logger.debug("Stored header {} {}", header.number, header.hash)
}
private suspend fun indexBlockHeader(header: BlockHeader) {
@@ -169,6 +170,16 @@ class BlockchainRepository
}
/**
+ * Returns true if the store contains the block body.
+ *
+ * @param blockHash the hash of the block stored
+ * @return a future with a boolean result
+ */
+ suspend fun hasBlockBody(blockHash: Bytes): Boolean {
+ return blockBodyStore.containsKey(blockHash)
+ }
+
+ /**
* Retrieves a block into the repository.
*
* @param blockHash the hash of the block stored
@@ -191,6 +202,16 @@ class BlockchainRepository
}
/**
+ * Returns true if the store contains the block header.
+ *
+ * @param blockHash the hash of the block stored
+ * @return a future with a boolean result
+ */
+ suspend fun hasBlockHeader(blockHash: Bytes): Boolean {
+ return blockHeaderStore.containsKey(blockHash)
+ }
+
+ /**
* Retrieves a block header into the repository.
*
* @param blockHash the hash of the block stored
@@ -206,7 +227,7 @@ class BlockchainRepository
*
* @return the current chain head, or the genesis block if no chain head is present.
*/
- suspend fun retrieveChainHead(): Block? {
+ suspend fun retrieveChainHead(): Block {
return blockchainIndex.findByLargest(BlockHeaderFields.TOTAL_DIFFICULTY)
?.let { retrieveBlock(it) } ?: retrieveGenesisBlock()
}
@@ -285,4 +306,6 @@ class BlockchainRepository
return chainMetadata
.put(GENESIS_BLOCK, block.getHeader().getHash())
}
+
+ fun retrieveChainHeadTotalDifficulty(): UInt256 = blockchainIndex.chainHeadTotalDifficulty()
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
index f8b2905..ab66f37 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -50,6 +50,17 @@ class EntityManagerKeyValueStore<K, V>
) = EntityManagerKeyValueStore(entityManagerProvider::get, entityClass, idAccessor::apply)
}
+ override suspend fun containsKey(key: K): Boolean {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ return em.contains(key)
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
override suspend fun get(key: K): V? {
val em = entityManagerProvider()
em.transaction.begin()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 9bc0f36..03f62a9 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -43,6 +43,8 @@ class InfinispanKeyValueStore<K, V> constructor(
fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
}
+ override suspend fun containsKey(key: K): Boolean = cache.containsKeyAsync(key).await()
+
override suspend fun get(key: K): V? = cache.getAsync(key).await()
override suspend fun put(key: K, value: V) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 3f20621..5aaab72 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -29,6 +29,22 @@ import java.io.Closeable
interface KeyValueStore<K, V> : Closeable, CoroutineScope {
/**
+ * Returns true if the store contains the key.
+ *
+ * @param key The key for the content.
+ * @return true if an entry with the key exists in the store.
+ */
+ suspend fun containsKey(key: K): Boolean
+
+ /**
+ * Returns true if the store contains the key.
+ *
+ * @param key The key for the content.
+ * @return An [AsyncResult] that will complete with a boolean result.
+ */
+ suspend fun containsKeyAsync(key: K): AsyncResult<Boolean> = asyncResult { containsKey(key) }
+
+ /**
* Retrieves data from the store.
*
* @param key The key for the content.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 90f0fab..9d50866 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -142,6 +142,8 @@ constructor(
return JniDBFactory.factory.open(dbPath.toFile(), options)
}
+ override suspend fun containsKey(key: K): Boolean = db[keySerializer(key).toArrayUnsafe()] != null
+
override suspend fun get(key: K): V? {
val rawValue = db[keySerializer(key).toArrayUnsafe()]
return if (rawValue == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index 02908fb..3d4b509 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -110,6 +110,8 @@ constructor(
).createOrOpen()
}
+ override suspend fun containsKey(key: K): Boolean = storageData.containsKey(keySerializer(key))
+
override suspend fun get(key: K): V? {
val value = storageData[keySerializer(key)]
return if (value == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index e7f0e84..389f98b 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -53,6 +53,8 @@ constructor(
fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
}
+ override suspend fun containsKey(key: K) = map.containsKey(key)
+
override suspend fun get(key: K): V? = map[key]
override suspend fun put(key: K, value: V) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
index 9dd8bea..8c82bc8 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
@@ -57,6 +57,8 @@ class ProxyKeyValueStore<K, V, E, R>(
ProxyKeyValueStore(store, unproxyKey::apply, proxyKey::apply, unproxyValue::apply, proxyValue::apply)
}
+ override suspend fun containsKey(key: K) = store.containsKey(proxyKey(key))
+
override suspend fun get(key: K): V? {
val value = store.get(proxyKey(key))
return if (value == null) {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index b73d87f..6e98dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -212,6 +212,8 @@ class RedisKeyValueStore<K, V>(
asyncCommands = conn.async()
}
+ override suspend fun containsKey(key: K) = asyncCommands.get(keySerializer(key)) != null
+
override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
if (it == null) {
null
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index e7f175e..39cb158 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -120,6 +120,8 @@ constructor(
return RocksDB.open(options, dbPath.toAbsolutePath().toString())
}
+ override suspend fun containsKey(key: K) = db[keySerializer(key).toArrayUnsafe()] != null
+
override suspend fun get(key: K): V? {
if (closed.get()) {
throw IllegalStateException("Closed DB")
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index c9095cc..2f54d72 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -123,6 +123,17 @@ constructor(
connectionPool = BoneCP(config)
}
+ override suspend fun containsKey(key: K): Boolean {
+ connectionPool.asyncConnection.await().use {
+ val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
+ stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+ stmt.execute()
+
+ val rs = stmt.resultSet
+ return rs.next()
+ }
+ }
+
override suspend fun get(key: K): V? {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org