You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/02/12 01:29:53 UTC

[incubator-tuweni] branch master updated: Update KV - add keys, allow updates on sql

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b118bc  Update KV - add keys, allow updates on sql
     new 520085e  Merge pull request #46 from atoulme/update_kv
4b118bc is described below

commit 4b118bc686d9b813f284ab62f15050d9a3d7901b
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Tue Feb 11 15:42:52 2020 -0800

    Update KV - add keys, allow updates on sql
---
 .../apache/tuweni/kv/InfinispanKeyValueStore.kt    |  9 ++-
 .../kotlin/org/apache/tuweni/kv/KeyValueStore.kt   | 39 +++++--------
 .../org/apache/tuweni/kv/LevelDBKeyValueStore.kt   | 26 ++++++---
 .../org/apache/tuweni/kv/MapDBKeyValueStore.kt     | 15 +++--
 .../org/apache/tuweni/kv/MapKeyValueStore.kt       |  9 ++-
 .../org/apache/tuweni/kv/RedisKeyValueStore.kt     | 11 +++-
 .../org/apache/tuweni/kv/RocksDBKeyValueStore.kt   | 34 +++++++++---
 .../org/apache/tuweni/kv/SQLKeyValueStore.kt       | 47 +++++++++++++---
 .../org/apache/tuweni/kv/KeyValueStoreTest.java    | 14 +++++
 .../org/apache/tuweni/kv/KeyValueStoreSpec.kt      | 64 ++++++++++++++++++++++
 10 files changed, 207 insertions(+), 61 deletions(-)

diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 0a8eab4..85a8b92 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -16,15 +16,20 @@
  */
 package org.apache.tuweni.kv
 
+import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
 import org.apache.tuweni.bytes.Bytes
 import org.infinispan.Cache
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by [Infinispan](https://infinispan.org)
  *
  */
-class InfinispanKeyValueStore constructor(private val cache: Cache<Bytes, Bytes>) : KeyValueStore {
+class InfinispanKeyValueStore constructor(
+  private val cache: Cache<Bytes, Bytes>,
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
+) : KeyValueStore {
 
   companion object {
 
@@ -44,6 +49,8 @@ class InfinispanKeyValueStore constructor(private val cache: Cache<Bytes, Bytes>
     cache.putAsync(key, value).await()
   }
 
+  override suspend fun keys(): Iterable<Bytes> = cache.keys
+
   /**
    * The cache is managed outside the scope of this key-value store.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 2ca24e0..588eddf 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -16,9 +16,7 @@
  */
 package org.apache.tuweni.kv
 
-import kotlinx.coroutines.CoroutineDispatcher
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.CoroutineScope
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.AsyncResult
@@ -29,7 +27,7 @@ import java.io.Closeable
 /**
  * A key-value store.
  */
-interface KeyValueStore : Closeable {
+interface KeyValueStore : Closeable, CoroutineScope {
 
   /**
    * Retrieves data from the store.
@@ -46,18 +44,7 @@ interface KeyValueStore : Closeable {
    * @return An [AsyncResult] that will complete with the stored content,
    *         or an empty optional if no content was available.
    */
-  fun getAsync(key: Bytes): AsyncResult<Bytes?> = getAsync(Dispatchers.Default, key)
-
-  /**
-   * Retrieves data from the store.
-   *
-   * @param key The key for the content.
-   * @param dispatcher The co-routine dispatcher for asynchronous tasks.
-   * @return An [AsyncResult] that will complete with the stored content,
-   *         or an empty optional if no content was available.
-   */
-  fun getAsync(dispatcher: CoroutineDispatcher, key: Bytes): AsyncResult<Bytes?> =
-    GlobalScope.asyncResult(dispatcher) { get(key) }
+  fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) }
 
   /**
    * Puts data into the store.
@@ -77,19 +64,19 @@ interface KeyValueStore : Closeable {
    * @param value The data to store.
    * @return An [AsyncCompletion] that will complete when the content is stored.
    */
-  fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = putAsync(Dispatchers.Default, key, value)
+  fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) }
 
   /**
-   * Puts data into the store.
+   * Provides an iterator over the keys of the store.
    *
-   * Note: if the storage implementation already contains content for the given key, it does not need to replace the
-   * existing content.
+   * @return An [Iterable] allowing to iterate over the set of keys.
+   */
+  suspend fun keys(): Iterable<Bytes>
+
+  /**
+   * Provides an iterator over the keys of the store.
    *
-   * @param key The key to associate with the data, for use when retrieving.
-   * @param value The data to store.
-   * @param dispatcher The co-routine dispatcher for asynchronous tasks.
-   * @return An [AsyncCompletion] that will complete when the content is stored.
+   * @return An [Iterable] allowing to iterate over the set of keys.
    */
-  fun putAsync(dispatcher: CoroutineDispatcher, key: Bytes, value: Bytes): AsyncCompletion =
-    GlobalScope.asyncCompletion(dispatcher) { put(key, value) }
+  fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() }
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 4513dd0..21ba81d 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -16,23 +16,23 @@
  */
 package org.apache.tuweni.kv
 
-import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.withContext
 import org.apache.tuweni.bytes.Bytes
 import org.fusesource.leveldbjni.JniDBFactory
 import org.iq80.leveldb.DB
+import org.iq80.leveldb.DBIterator
 import org.iq80.leveldb.Options
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by LevelDB.
  *
  * @param dbPath The path to the levelDB database.
  * @param options Options for the levelDB database.
- * @param dispatcher The co-routine context for blocking tasks.
+ * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a LevelDB-backed key-value store.
@@ -42,7 +42,7 @@ class LevelDBKeyValueStore
 constructor(
   dbPath: Path,
   options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
-  private val dispatcher: CoroutineDispatcher = Dispatchers.IO
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore {
 
   companion object {
@@ -77,17 +77,27 @@ constructor(
     db = JniDBFactory.factory.open(dbPath.toFile(), options)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) {
+  override suspend fun get(key: Bytes): Bytes? {
     val rawValue = db[key.toArrayUnsafe()]
-    if (rawValue == null) {
+    return if (rawValue == null) {
       null
     } else {
       Bytes.wrap(rawValue)
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) {
-    db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+  override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+
+  private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> {
+    override fun hasNext(): Boolean = iter.hasNext()
+
+    override fun next(): Bytes = Bytes.wrap(iter.next().key)
+  }
+
+  override suspend fun keys(): Iterable<Bytes> {
+    val iter = db.iterator()
+    iter.seekToFirst()
+    return Iterable { BytesIterator(iter) }
   }
 
   /**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index d7934be..f5bf128 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -16,9 +16,7 @@
  */
 package org.apache.tuweni.kv
 
-import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.withContext
 import org.apache.tuweni.bytes.Bytes
 import org.mapdb.DB
 import org.mapdb.DBMaker
@@ -28,12 +26,13 @@ import org.mapdb.HTreeMap
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by a MapDB instance.
  *
  * @param dbPath The path to the MapDB database.
- * @param dispatcher The co-routine dispatcher for blocking tasks.
+ * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a MapDB-backed key-value store.
@@ -42,7 +41,7 @@ class MapDBKeyValueStore
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
-  private val dispatcher: CoroutineDispatcher = Dispatchers.IO
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore {
 
   companion object {
@@ -71,15 +70,15 @@ constructor(
     ).createOrOpen()
   }
 
-  override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) {
-    storageData[key]
-  }
+  override suspend fun get(key: Bytes): Bytes? = storageData[key]
 
-  override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) {
+  override suspend fun put(key: Bytes, value: Bytes) {
     storageData[key] = value
     db.commit()
   }
 
+  override suspend fun keys(): Iterable<Bytes> = storageData.keys
+
   /**
    * Closes the underlying MapDB instance.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index bf0f622..bc15460 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -16,7 +16,9 @@
  */
 package org.apache.tuweni.kv
 
+import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.bytes.Bytes
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by an in-memory Map.
@@ -26,7 +28,10 @@ import org.apache.tuweni.bytes.Bytes
  * @constructor Open an in-memory key-value store.
  */
 class MapKeyValueStore
-constructor(private val map: MutableMap<Bytes, Bytes> = HashMap()) : KeyValueStore {
+constructor(
+  private val map: MutableMap<Bytes, Bytes> = HashMap(),
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
+) : KeyValueStore {
 
   companion object {
     /**
@@ -55,6 +60,8 @@ constructor(private val map: MutableMap<Bytes, Bytes> = HashMap()) : KeyValueSto
     map[key] = value
   }
 
+  override suspend fun keys(): Iterable<Bytes> = map.keys
+
   /**
    * Has no effect in this KeyValueStore implementation.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index 74c4516..8e8717c 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -21,10 +21,12 @@ import io.lettuce.core.RedisURI
 import io.lettuce.core.api.StatefulRedisConnection
 import io.lettuce.core.api.async.RedisAsyncCommands
 import io.lettuce.core.codec.RedisCodec
+import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
 import org.apache.tuweni.bytes.Bytes
 import java.net.InetAddress
 import java.util.concurrent.CompletionStage
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by Redis.
@@ -32,7 +34,10 @@ import java.util.concurrent.CompletionStage
  * @param uri The uri to the Redis store.
  * @constructor Open a Redis-backed key-value store.
  */
-class RedisKeyValueStore(uri: String) : KeyValueStore {
+class RedisKeyValueStore(
+  uri: String,
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
+) : KeyValueStore {
 
   companion object {
     /**
@@ -98,7 +103,7 @@ class RedisKeyValueStore(uri: String) : KeyValueStore {
 
   init {
     val redisClient = RedisClient.create(uri)
-    conn = redisClient.connect(RedisKeyValueStore.codec())
+    conn = redisClient.connect(codec())
     asyncCommands = conn.async()
   }
 
@@ -109,6 +114,8 @@ class RedisKeyValueStore(uri: String) : KeyValueStore {
     future.await()
   }
 
+  override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await()
+
   override fun close() {
     conn.close()
   }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index a3dac84..7911803 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -16,23 +16,23 @@
  */
 package org.apache.tuweni.kv
 
-import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.withContext
 import org.apache.tuweni.bytes.Bytes
 import org.rocksdb.Options
 import org.rocksdb.RocksDB
+import org.rocksdb.RocksIterator
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by RocksDB.
  *
  * @param dbPath The path to the RocksDB database.
  * @param options Options for the RocksDB database.
- * @param dispatcher The co-routine context for blocking tasks.
+ * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a RocksDB-backed key-value store.
@@ -42,7 +42,7 @@ class RocksDBKeyValueStore
 constructor(
   dbPath: Path,
   options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
-  private val dispatcher: CoroutineDispatcher = Dispatchers.IO
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore {
 
   companion object {
@@ -79,25 +79,45 @@ constructor(
     db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
   }
 
-  override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) {
+  override suspend fun get(key: Bytes): Bytes? {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
     val rawValue = db[key.toArrayUnsafe()]
-    if (rawValue == null) {
+    return if (rawValue == null) {
       null
     } else {
       Bytes.wrap(rawValue)
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) {
+  override suspend fun put(key: Bytes, value: Bytes) {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
     db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
   }
 
+  private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> {
+
+    override fun hasNext(): Boolean = rIterator.isValid
+
+    override fun next(): Bytes {
+      val key = Bytes.wrap(rIterator.key())
+      rIterator.next()
+      return key
+    }
+  }
+
+  override suspend fun keys(): Iterable<Bytes> {
+    if (closed.get()) {
+      throw IllegalStateException("Closed DB")
+    }
+    val iter = db.newIterator()
+    iter.seekToFirst()
+    return Iterable { BytesIterator(iter) }
+  }
+
   /**
    * Closes the underlying RocksDB instance.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index 6f5b2ed..aa07336 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -18,12 +18,13 @@ package org.apache.tuweni.kv
 
 import com.jolbox.bonecp.BoneCP
 import com.jolbox.bonecp.BoneCPConfig
-import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.guava.await
-import kotlinx.coroutines.withContext
 import org.apache.tuweni.bytes.Bytes
 import java.io.IOException
+import java.sql.ResultSet
+import java.sql.SQLException
+import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by a relational database.
@@ -32,7 +33,7 @@ import java.io.IOException
  * @param tableName the name of the table to use for storage.
  * @param keyColumn the key column of the store.
  * @param valueColumn the value column of the store.
- * @param dispatcher The co-routine context for blocking tasks.
+ * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a relational database backed key-value store.
@@ -44,7 +45,7 @@ constructor(
   val tableName: String = "store",
   val keyColumn: String = "key",
   val valueColumn: String = "value",
-  private val dispatcher: CoroutineDispatcher = Dispatchers.IO
+  override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore {
 
   companion object {
@@ -84,7 +85,7 @@ constructor(
     connectionPool = BoneCP(config)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) {
+  override suspend fun get(key: Bytes): Bytes? {
       connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
         stmt.setBytes(1, key.toArrayUnsafe())
@@ -92,7 +93,7 @@ constructor(
 
         val rs = stmt.resultSet
 
-        if (rs.next()) {
+        return if (rs.next()) {
           Bytes.wrap(rs.getBytes(1))
         } else {
           null
@@ -100,16 +101,46 @@ constructor(
       }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) {
+  override suspend fun put(key: Bytes, value: Bytes) {
     connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
         stmt.setBytes(1, key.toArrayUnsafe())
         stmt.setBytes(2, value.toArrayUnsafe())
-        stmt.execute()
+        it.autoCommit = false
+        try {
+          stmt.execute()
+        } catch (e: SQLException) {
+          val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?")
+          updateStmt.setBytes(1, value.toArrayUnsafe())
+          updateStmt.setBytes(2, key.toArrayUnsafe())
+          updateStmt.execute()
+        }
+        it.commit()
         Unit
       }
   }
 
+  private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> {
+
+    private var next = resultSet.next()
+
+    override fun hasNext(): Boolean = next
+
+    override fun next(): Bytes {
+      val key = Bytes.wrap(resultSet.getBytes(1))
+      next = resultSet.next()
+      return key
+    }
+  }
+
+  override suspend fun keys(): Iterable<Bytes> {
+    connectionPool.asyncConnection.await().use {
+      val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName")
+      stmt.execute()
+      return Iterable { SQLIterator(stmt.resultSet) }
+    }
+  }
+
   /**
    * Closes the underlying connection pool.
    */
diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
index 202d58b..b5d8c2a 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
@@ -22,8 +22,11 @@ import org.apache.tuweni.junit.TempDirectory;
 import org.apache.tuweni.junit.TempDirectoryExtension;
 
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -51,6 +54,17 @@ class KeyValueStoreTest {
   }
 
   @Test
+  void testKeys() throws Exception {
+    Map<Bytes, Bytes> map = new HashMap<>();
+    KeyValueStore store = MapKeyValueStore.open(map);
+    AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
+    completion.join();
+    Set<Bytes> keys = new HashSet<>();
+    store.keysAsync().get().forEach(keys::add);
+    assertEquals(new HashSet<>(Arrays.asList(Bytes.of(123))), keys);
+  }
+
+  @Test
   void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
     try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
       AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index 1c46165..ae8c62b 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -21,6 +21,7 @@ import com.google.common.io.RecursiveDeleteOption
 import com.winterbe.expekt.should
 import kotlinx.coroutines.runBlocking
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.kv.Vars.bar
 import org.apache.tuweni.kv.Vars.foo
 import org.apache.tuweni.kv.Vars.foobar
 import org.infinispan.Cache
@@ -37,6 +38,7 @@ import java.util.concurrent.RejectedExecutionException
 
 object Vars {
   val foo = Bytes.wrap("foo".toByteArray())!!
+  val bar = Bytes.wrap("bar".toByteArray())!!
   val foobar = Bytes.wrap("foobar".toByteArray())!!
 }
 
@@ -65,6 +67,14 @@ object KeyValueStoreSpec : Spek({
         kv.get(Bytes.wrap("foofoobar".toByteArray())).should.be.`null`
       }
     }
+
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.put(foo, bar)
+        kv.keys().should.equal(setOf(foobar, foo))
+      }
+    }
   }
 })
 
@@ -95,6 +105,16 @@ object InfinispanKeyValueStoreSpec : Spek({
         kv.get(Bytes.wrap("foofoobar".toByteArray())).should.be.`null`
       }
     }
+
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(bar, foo)
+        kv.put(foo, bar)
+          val keys = (kv.keys().map { it })
+        keys.should.contain(bar)
+        keys.should.contain(foo)
+      }
+    }
   }
 })
 
@@ -117,6 +137,14 @@ object MapDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.put(foo, bar)
+        kv.keys().should.equal(setOf(foobar, foo))
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db"))
       kv2.close()
@@ -160,6 +188,14 @@ object LevelDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.put(foo, bar)
+        setOf(foobar, foo).should.equal(kv.keys().map { it }.toSet())
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = LevelDBKeyValueStore(path.resolve("subdb"))
       kv2.close()
@@ -198,6 +234,16 @@ object RocksDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(bar, foo)
+        kv.put(foo, bar)
+        val keys = kv.keys().map { it }
+        keys.should.contain(bar)
+        keys.should.contain(foo)
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = RocksDBKeyValueStore(path.resolve("subdb"))
       kv2.close()
@@ -238,6 +284,14 @@ object SQLKeyValueStoreSpec : Spek({
       }
     }
 
+    it("should allow to update values") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.put(foobar, bar)
+        kv.get(foobar).should.equal(bar)
+      }
+    }
+
     it("should allow to retrieve values when configured with a different table") {
       runBlocking {
         otherkv.put(foobar, foo)
@@ -251,6 +305,16 @@ object SQLKeyValueStoreSpec : Spek({
       }
     }
 
+    it("should iterate over keys") {
+      runBlocking {
+        kv.put(bar, foo)
+        kv.put(foo, bar)
+        val keys = kv.keys().map { it }
+        keys.should.contain(bar)
+        keys.should.contain(foo)
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
       kv2.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org