You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/02/12 23:03:55 UTC

[incubator-tuweni] branch support_kv_store created (now 991aefd)

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a change to branch support_kv_store
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git.


      at 991aefd  Make key-value stores generic

This branch includes the following new commits:

     new 991aefd  Make key-value stores generic

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org


[incubator-tuweni] 01/01: Make key-value stores generic

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch support_kv_store
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit 991aefd5a97af667ce946f0701d44ed06702e27f
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Feb 12 15:03:36 2020 -0800

    Make key-value stores generic
---
 dependency-versions.gradle                         |   1 +
 .../tuweni/eth/repository/BlockchainRepository.kt  |  16 +--
 kv/build.gradle                                    |   1 +
 .../apache/tuweni/kv/EntityManagerKeyValueStore.kt |  86 +++++++++++++++
 .../apache/tuweni/kv/InfinispanKeyValueStore.kt    |  16 +--
 .../kotlin/org/apache/tuweni/kv/KeyValueStore.kt   |  15 ++-
 .../org/apache/tuweni/kv/LevelDBKeyValueStore.kt   |  67 +++++++++---
 .../org/apache/tuweni/kv/MapDBKeyValueStore.kt     |  33 ++++--
 .../org/apache/tuweni/kv/MapKeyValueStore.kt       |  17 ++-
 .../org/apache/tuweni/kv/RedisKeyValueStore.kt     | 115 ++++++++++++++++++---
 .../org/apache/tuweni/kv/RocksDBKeyValueStore.kt   |  66 +++++++++---
 .../org/apache/tuweni/kv/SQLKeyValueStore.kt       |  74 +++++++++----
 .../org/apache/tuweni/kv/KeyValueStoreTest.java    |  23 ++++-
 .../apache/tuweni/kv/RedisKeyValueStoreTest.java   |  19 +++-
 .../org/apache/tuweni/kv/KeyValueStoreSpec.kt      |  72 +++++++++++--
 15 files changed, 505 insertions(+), 116 deletions(-)

diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 9c3d571..fa8d1cd 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -29,6 +29,7 @@ dependencyManagement {
     dependency('info.picocli:picocli:4.0.0-alpha-2')
     dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
     dependency('io.vertx:vertx-core:3.6.2')
+    dependency('javax.persistence:javax.persistence-api:2.2')
     dependencySet(group: 'org.antlr', version: '4.7.1') {
       entry 'antlr4'
       entry 'antlr4-runtime'
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index b7c478e..9c4e30a 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -41,10 +41,10 @@ class BlockchainRepository
  * @param blockchainIndex the blockchain index to index values
  */
   (
-    private val chainMetadata: KeyValueStore,
-    private val blockBodyStore: KeyValueStore,
-    private val blockHeaderStore: KeyValueStore,
-    private val transactionReceiptsStore: KeyValueStore,
+    private val chainMetadata: KeyValueStore<Bytes, Bytes>,
+    private val blockBodyStore: KeyValueStore<Bytes, Bytes>,
+    private val blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+    private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
     private val blockchainIndex: BlockchainIndex
   ) {
 
@@ -58,10 +58,10 @@ class BlockchainRepository
      * @return a new blockchain repository made from the metadata passed in parameter.
      */
     suspend fun init(
-      blockBodyStore: KeyValueStore,
-      blockHeaderStore: KeyValueStore,
-      chainMetadata: KeyValueStore,
-      transactionReceiptsStore: KeyValueStore,
+      blockBodyStore: KeyValueStore<Bytes, Bytes>,
+      blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+      chainMetadata: KeyValueStore<Bytes, Bytes>,
+      transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
       blockchainIndex: BlockchainIndex,
       genesisBlock: Block
     ): BlockchainRepository {
diff --git a/kv/build.gradle b/kv/build.gradle
index 0f20ad0..d05b5c2 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -19,6 +19,7 @@ dependencies {
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
   compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
+  compile 'javax.persistence:javax.persistence-api'
   compileOnly 'com.jolbox:bonecp'
   compileOnly 'io.lettuce:lettuce-core'
   compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
new file mode 100644
index 0000000..8b653ad
--- /dev/null
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.kv
+
+import kotlin.jvm.Throws
+import kotlinx.coroutines.Dispatchers
+
+import java.io.IOException
+import java.util.function.Function
+import java.util.function.Supplier
+import java.util.stream.Stream
+import javax.persistence.EntityManager
+import kotlin.coroutines.CoroutineContext
+
+class EntityManagerKeyValueStore<K, V>
+        @Throws(IOException::class)
+        constructor(
+          private val entityManagerProvider: () -> EntityManager,
+          private val entityClass: Class<V>,
+          private val idAccessor: (V) -> K,
+          override val coroutineContext: CoroutineContext = Dispatchers.IO
+        ) : KeyValueStore<K, V> {
+
+  companion object {
+    /**
+     * Open a relational database backed key-value store using a JPA entity manager.
+     *
+     * @param entityManagerProvider The supplier of entity manager to operate.
+     * @return A key-value store.
+     * @throws IOException If an I/O error occurs.
+     */
+    @JvmStatic
+    @Throws(IOException::class)
+    fun <K, V> open(
+      entityManagerProvider: Supplier<EntityManager>,
+      entityClass: Class<V>,
+      idAccessor: Function<V, K>
+    ) = EntityManagerKeyValueStore<K, V>(entityManagerProvider::get, entityClass, idAccessor::apply)
+  }
+
+  override suspend fun get(key: K): V? {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      return em.find(entityClass, key)
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
+  override suspend fun put(key: K, value: V) {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      em.merge(value)
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
+  override suspend fun keys(): Iterable<K> {
+    val em = entityManagerProvider()
+    val query = em.createQuery(em.criteriaBuilder.createQuery(entityClass))
+    val resultStream: Stream<V> = query.resultStream
+    return Iterable { resultStream.map(idAccessor).iterator() }
+  }
+
+  override fun close() {
+  }
+}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 85a8b92..50e353f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -18,7 +18,7 @@ package org.apache.tuweni.kv
 
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
-import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
 import org.infinispan.Cache
 import kotlin.coroutines.CoroutineContext
 
@@ -26,10 +26,10 @@ import kotlin.coroutines.CoroutineContext
  * A key-value store backed by [Infinispan](https://infinispan.org)
  *
  */
-class InfinispanKeyValueStore constructor(
-  private val cache: Cache<Bytes, Bytes>,
+class InfinispanKeyValueStore<K, V> constructor(
+  private val cache: Cache<K, V>,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
 
@@ -40,16 +40,16 @@ class InfinispanKeyValueStore constructor(
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(cache: Cache<Bytes, Bytes>) = InfinispanKeyValueStore(cache)
+    fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = cache.getAsync(key).await()
+  override suspend fun get(key: K): V? = cache.getAsync(key).await()
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     cache.putAsync(key, value).await()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = cache.keys
+  override suspend fun keys(): Iterable<K> = cache.keys
 
   /**
    * The cache is managed outside the scope of this key-value store.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 588eddf..7f2775a 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -17,7 +17,6 @@
 package org.apache.tuweni.kv
 
 import kotlinx.coroutines.CoroutineScope
-import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.AsyncResult
 import org.apache.tuweni.concurrent.coroutines.asyncCompletion
@@ -27,7 +26,7 @@ import java.io.Closeable
 /**
  * A key-value store.
  */
-interface KeyValueStore : Closeable, CoroutineScope {
+interface KeyValueStore<K, V> : Closeable, CoroutineScope {
 
   /**
    * Retrieves data from the store.
@@ -35,7 +34,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
    * @param key The key for the content.
    * @return The stored data, or null if no data was stored under the specified key.
    */
-  suspend fun get(key: Bytes): Bytes?
+  suspend fun get(key: K): V?
 
   /**
    * Retrieves data from the store.
@@ -44,7 +43,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
    * @return An [AsyncResult] that will complete with the stored content,
    *         or an empty optional if no content was available.
    */
-  fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) }
+  fun getAsync(key: K): AsyncResult<V?> = asyncResult { get(key) }
 
   /**
    * Puts data into the store.
@@ -52,7 +51,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
    * @param key The key to associate with the data, for use when retrieving.
    * @param value The data to store.
    */
-  suspend fun put(key: Bytes, value: Bytes)
+  suspend fun put(key: K, value: V)
 
   /**
    * Puts data into the store.
@@ -64,19 +63,19 @@ interface KeyValueStore : Closeable, CoroutineScope {
    * @param value The data to store.
    * @return An [AsyncCompletion] that will complete when the content is stored.
    */
-  fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) }
+  fun putAsync(key: K, value: V): AsyncCompletion = asyncCompletion { put(key, value) }
 
   /**
    * Provides an iterator over the keys of the store.
    *
    * @return An [Iterable] allowing to iterate over the set of keys.
    */
-  suspend fun keys(): Iterable<Bytes>
+  suspend fun keys(): Iterable<K>
 
   /**
    * Provides an iterator over the keys of the store.
    *
    * @return An [Iterable] allowing to iterate over the set of keys.
    */
-  fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() }
+  fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 21ba81d..f0e8985 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -25,49 +25,91 @@ import org.iq80.leveldb.Options
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by LevelDB.
  *
  * @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
  * @param options Options for the levelDB database.
  * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a LevelDB-backed key-value store.
  */
-class LevelDBKeyValueStore
+class LevelDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
+
     /**
      * Open a LevelDB-backed key-value store.
      *
      * @param dbPath The path to the levelDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = LevelDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ): LevelDBKeyValueStore<K, V> =
+      LevelDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * Open a LevelDB-backed key-value store.
      *
      * @param dbPath The path to the levelDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @param options Options for the levelDB database.
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path, options: Options) = LevelDBKeyValueStore(dbPath, options)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>,
+      options: Options
+    ) =
+      LevelDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply,
+        options)
   }
 
   private val db: DB
@@ -77,27 +119,28 @@ constructor(
     db = JniDBFactory.factory.open(dbPath.toFile(), options)
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
-    val rawValue = db[key.toArrayUnsafe()]
+  override suspend fun get(key: K): V? {
+    val rawValue = db[keySerializer(key).toArrayUnsafe()]
     return if (rawValue == null) {
       null
     } else {
-      Bytes.wrap(rawValue)
+      valueDeserializer(Bytes.wrap(rawValue))
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+  override suspend fun put(key: K, value: V) = db.put(keySerializer(key).toArrayUnsafe(),
+    valueSerializer(value).toArrayUnsafe())
 
-  private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> {
+  private class KIterator<K>(val iter: DBIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
     override fun hasNext(): Boolean = iter.hasNext()
 
-    override fun next(): Bytes = Bytes.wrap(iter.next().key)
+    override fun next(): K = keyDeserializer(Bytes.wrap(iter.next().key))
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     val iter = db.iterator()
     iter.seekToFirst()
-    return Iterable { BytesIterator(iter) }
+    return Iterable { KIterator(iter, keyDeserializer) }
   }
 
   /**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index f5bf128..190108f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -32,29 +32,48 @@ import kotlin.coroutines.CoroutineContext
  * A key-value store backed by a MapDB instance.
  *
  * @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
  * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a MapDB-backed key-value store.
  */
-class MapDBKeyValueStore
+class MapDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes?) -> V?,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a MapDB-backed key-value store.
      *
      * @param dbPath The path to the MapDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = MapDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) =
+      MapDBKeyValueStore<K, V>(dbPath, keySerializer, valueSerializer, keyDeserializer, valueDeserializer)
   }
 
   private val db: DB
@@ -70,14 +89,14 @@ constructor(
     ).createOrOpen()
   }
 
-  override suspend fun get(key: Bytes): Bytes? = storageData[key]
+  override suspend fun get(key: K): V? = valueDeserializer(storageData[keySerializer(key)])
 
-  override suspend fun put(key: Bytes, value: Bytes) {
-    storageData[key] = value
+  override suspend fun put(key: K, value: V) {
+    storageData[keySerializer(key)] = valueSerializer(value)
     db.commit()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = storageData.keys
+  override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
 
   /**
    * Closes the underlying MapDB instance.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index bc15460..5eaca56 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -17,7 +17,6 @@
 package org.apache.tuweni.kv
 
 import kotlinx.coroutines.Dispatchers
-import org.apache.tuweni.bytes.Bytes
 import kotlin.coroutines.CoroutineContext
 
 /**
@@ -27,11 +26,11 @@ import kotlin.coroutines.CoroutineContext
  * @return A key-value store.
  * @constructor Open an in-memory key-value store.
  */
-class MapKeyValueStore
+class MapKeyValueStore<K, V>
 constructor(
-  private val map: MutableMap<Bytes, Bytes> = HashMap(),
+  private val map: MutableMap<K, V> = HashMap(),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
@@ -42,7 +41,7 @@ constructor(
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(): MapKeyValueStore = MapKeyValueStore()
+    fun <K, V> open() = MapKeyValueStore<K, V>()
 
     /**
      * Open an in-memory key-value store.
@@ -51,16 +50,16 @@ constructor(
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(map: MutableMap<Bytes, Bytes>) = MapKeyValueStore(map)
+    fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = map[key]
+  override suspend fun get(key: K): V? = map[key]
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     map[key] = value
   }
 
-  override suspend fun keys(): Iterable<Bytes> = map.keys
+  override suspend fun keys(): Iterable<K> = map.keys
 
   /**
    * Has no effect in this KeyValueStore implementation.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index 8e8717c..f22f0cd 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -24,58 +24,131 @@ import io.lettuce.core.codec.RedisCodec
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
 import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
 import java.net.InetAddress
 import java.util.concurrent.CompletionStage
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by Redis.
  *
  * @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
+ * @param coroutineContext the co-routine context in which this store executes
  * @constructor Open a Redis-backed key-value store.
  */
-class RedisKeyValueStore(
+class RedisKeyValueStore<K, V>(
   uri: String,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a Redis-backed key-value store.
      *
      * @param uri The uri to the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(uri: String) = RedisKeyValueStore(uri)
+    fun <K, V> open(
+      uri: String,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) = RedisKeyValueStore(uri,
+      keySerializer::apply,
+      valueSerializer::apply,
+      keyDeserializer::apply,
+      valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param port The port for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(port: Int) = RedisKeyValueStore(port)
+    fun <K, V> open(
+      port: Int,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(port = port,
+        keySerializer = keySerializer::apply,
+        valueSerializer = valueSerializer::apply,
+        keyDeserializer = keyDeserializer::apply,
+        valueDeserializer = valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param address The address for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(address: InetAddress) = RedisKeyValueStore(6379, address)
+    fun <K, V> open(
+      address: InetAddress,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(6379,
+        address,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param port The port for the Redis store.
      * @param address The address for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(port: Int, address: InetAddress) = RedisKeyValueStore(port, address)
+    fun <K, V> open(
+      port: Int,
+      address: InetAddress,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(port,
+        address,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * A [RedisCodec] for working with Bytes classes.
@@ -94,12 +167,24 @@ class RedisKeyValueStore(
    *
    * @param port The port for the Redis store.
    * @param address The address for the Redis store.
+   * @param keySerializer the serializer of key objects to bytes
+   * @param valueSerializer the serializer of value objects to bytes
+   * @param keyDeserializer the deserializer of keys from bytes
+   * @param valueDeserializer the deserializer of values from bytes
    */
   @JvmOverloads
   constructor(
     port: Int = 6379,
-    address: InetAddress = InetAddress.getLoopbackAddress()
-  ) : this(RedisURI.create(address.hostAddress, port).toURI().toString())
+    address: InetAddress = InetAddress.getLoopbackAddress(),
+    keySerializer: (K) -> Bytes,
+    valueSerializer: (V) -> Bytes,
+    keyDeserializer: (Bytes) -> K,
+    valueDeserializer: (Bytes) -> V
+  ) : this(RedisURI.create(address.hostAddress, port).toURI().toString(),
+    keySerializer,
+    valueSerializer,
+    keyDeserializer,
+    valueDeserializer)
 
   init {
     val redisClient = RedisClient.create(uri)
@@ -107,14 +192,20 @@ class RedisKeyValueStore(
     asyncCommands = conn.async()
   }
 
-  override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()
+  override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
+    if (it == null) {
+      null
+    } else {
+      valueDeserializer(it)
+    }
+  }.await()
 
-  override suspend fun put(key: Bytes, value: Bytes) {
-    val future: CompletionStage<String> = asyncCommands.set(key, value)
+  override suspend fun put(key: K, value: V) {
+    val future: CompletionStage<String> = asyncCommands.set(keySerializer(key), valueSerializer(value))
     future.await()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await()
+  override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
 
   override fun close() {
     conn.close()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 7911803..30e8dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -25,6 +25,7 @@ import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
@@ -37,37 +38,72 @@ import kotlin.coroutines.CoroutineContext
  * @throws IOException If an I/O error occurs.
  * @constructor Open a RocksDB-backed key-value store.
  */
-class RocksDBKeyValueStore
+class RocksDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a RocksDB-backed key-value store.
      *
      * @param dbPath The path to the RocksDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = RocksDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) = RocksDBKeyValueStore(dbPath,
+      keySerializer::apply,
+      valueSerializer::apply,
+      keyDeserializer::apply,
+      valueDeserializer::apply)
 
     /**
      * Open a RocksDB-backed key-value store.
      *
      * @param dbPath The path to the RocksDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @param options Options for the RocksDB database.
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path, options: Options) = RocksDBKeyValueStore(dbPath, options)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>,
+      options: Options
+    ) =
+      RocksDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply,
+        options)
   }
 
   private val db: RocksDB
@@ -79,43 +115,43 @@ constructor(
     db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
+  override suspend fun get(key: K): V? {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
-    val rawValue = db[key.toArrayUnsafe()]
+    val rawValue = db[keySerializer(key).toArrayUnsafe()]
     return if (rawValue == null) {
       null
     } else {
-      Bytes.wrap(rawValue)
+      valueDeserializer(Bytes.wrap(rawValue))
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
-    db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+    db.put(keySerializer(key).toArrayUnsafe(), valueSerializer(value).toArrayUnsafe())
   }
 
-  private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> {
+  private class BytesIterator<K>(val rIterator: RocksIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
 
     override fun hasNext(): Boolean = rIterator.isValid
 
-    override fun next(): Bytes {
-      val key = Bytes.wrap(rIterator.key())
+    override fun next(): K {
+      val key = rIterator.key()
       rIterator.next()
-      return key
+      return keyDeserializer(Bytes.wrap(key))
     }
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
     val iter = db.newIterator()
     iter.seekToFirst()
-    return Iterable { BytesIterator(iter) }
+    return Iterable { BytesIterator(iter, keyDeserializer) }
   }
 
   /**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index aa07336..46c6e93 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -38,27 +38,45 @@ import kotlin.coroutines.CoroutineContext
  * @throws IOException If an I/O error occurs.
  * @constructor Open a relational database backed key-value store.
  */
-class SQLKeyValueStore
+class SQLKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   jdbcurl: String,
   val tableName: String = "store",
   val keyColumn: String = "key",
   val valueColumn: String = "value",
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes?) -> V?,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a relational database backed key-value store.
      *
      * @param jdbcUrl The JDBC url to connect to the database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl)
+    fun <K, V> open(
+      jdbcUrl: String,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) = SQLKeyValueStore<K, V>(jdbcurl = jdbcUrl,
+      keySerializer = keySerializer,
+      valueSerializer = valueSerializer,
+    keyDeserializer = keyDeserializer,
+      valueDeserializer = valueDeserializer)
 
     /**
      * Open a relational database backed key-value store.
@@ -67,13 +85,33 @@ constructor(
      * @param tableName the name of the table to use for storage.
      * @param keyColumn the key column of the store.
      * @param valueColumn the value column of the store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) =
-      SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn)
+    fun <K, V> open(
+      jdbcUrl: String,
+      tableName: String,
+      keyColumn: String,
+      valueColumn: String,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) =
+      SQLKeyValueStore<K, V>(jdbcUrl,
+        tableName,
+        keyColumn,
+        valueColumn,
+        keySerializer,
+        valueSerializer,
+        keyDeserializer,
+        valueDeserializer)
   }
 
   private val connectionPool: BoneCP
@@ -85,34 +123,34 @@ constructor(
     connectionPool = BoneCP(config)
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
+  override suspend fun get(key: K): V? {
       connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
-        stmt.setBytes(1, key.toArrayUnsafe())
+        stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
         stmt.execute()
 
         val rs = stmt.resultSet
 
         return if (rs.next()) {
-          Bytes.wrap(rs.getBytes(1))
+          valueDeserializer(Bytes.wrap(rs.getBytes(1)))
         } else {
           null
         }
       }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
-        stmt.setBytes(1, key.toArrayUnsafe())
-        stmt.setBytes(2, value.toArrayUnsafe())
+        stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+        stmt.setBytes(2, valueSerializer(value).toArrayUnsafe())
         it.autoCommit = false
         try {
           stmt.execute()
         } catch (e: SQLException) {
           val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?")
-          updateStmt.setBytes(1, value.toArrayUnsafe())
-          updateStmt.setBytes(2, key.toArrayUnsafe())
+          updateStmt.setBytes(1, valueSerializer(value).toArrayUnsafe())
+          updateStmt.setBytes(2, keySerializer(key).toArrayUnsafe())
           updateStmt.execute()
         }
         it.commit()
@@ -120,24 +158,24 @@ constructor(
       }
   }
 
-  private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> {
+  private class SQLIterator<K>(val resultSet: ResultSet, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
 
     private var next = resultSet.next()
 
     override fun hasNext(): Boolean = next
 
-    override fun next(): Bytes {
-      val key = Bytes.wrap(resultSet.getBytes(1))
+    override fun next(): K {
+      val key = keyDeserializer(Bytes.wrap(resultSet.getBytes(1)))
       next = resultSet.next()
       return key
     }
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     connectionPool.asyncConnection.await().use {
       val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName")
       stmt.execute()
-      return Iterable { SQLIterator(stmt.resultSet) }
+      return Iterable { SQLIterator(stmt.resultSet, keyDeserializer) }
     }
   }
 
diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
index b5d8c2a..19e0f37 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +35,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(TempDirectoryExtension.class)
 class KeyValueStoreTest {
 
+  private static final Function<Bytes, Bytes> bytesIdentityFn = Function.identity();
+
   @Test
   void testPutAndGet() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,14 +52,14 @@ class KeyValueStoreTest {
   @Test
   void testNoValue() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     assertNull(store.getAsync(Bytes.of(123)).get());
   }
 
   @Test
   void testKeys() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Set<Bytes> keys = new HashSet<>();
@@ -66,7 +69,12 @@ class KeyValueStoreTest {
 
   @Test
   void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
-    try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+    try (LevelDBKeyValueStore<Bytes, Bytes> leveldb = LevelDBKeyValueStore.open(
+        tempDirectory.resolve("foo").resolve("bar"),
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn)) {
       AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = leveldb.getAsync(Bytes.of(123)).get();
@@ -77,7 +85,12 @@ class KeyValueStoreTest {
 
   @Test
   void testRocksDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
-    try (RocksDBKeyValueStore rocksdb = RocksDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+    try (RocksDBKeyValueStore<Bytes, Bytes> rocksdb = RocksDBKeyValueStore.open(
+        tempDirectory.resolve("foo").resolve("bar"),
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn)) {
       AsyncCompletion completion = rocksdb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = rocksdb.getAsync(Bytes.of(123)).get();
diff --git a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
index c218a65..db44ae1 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.tuweni.junit.RedisPort;
 import org.apache.tuweni.junit.RedisServerExtension;
 
 import java.net.InetAddress;
+import java.util.function.Function;
 
 import io.lettuce.core.RedisClient;
 import io.lettuce.core.RedisURI;
@@ -34,7 +35,8 @@ class RedisKeyValueStoreTest {
 
   @Test
   void testPutAndGet(@RedisPort Integer redisPort) throws Exception {
-    KeyValueStore store = RedisKeyValueStore.open(redisPort);
+    KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore
+        .open(redisPort, Function.identity(), Function.identity(), Function.identity(), Function.identity());
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,13 +51,24 @@ class RedisKeyValueStoreTest {
 
   @Test
   void testNoValue(@RedisPort Integer redisPort) throws Exception {
-    KeyValueStore store = RedisKeyValueStore.open(redisPort, InetAddress.getLoopbackAddress());
+    KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore.open(
+        redisPort,
+        InetAddress.getLoopbackAddress(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity());
     assertNull(store.getAsync(Bytes.of(124)).get());
   }
 
   @Test
   void testRedisCloseable(@RedisPort Integer redisPort) throws Exception {
-    try (RedisKeyValueStore redis = RedisKeyValueStore.open("redis://127.0.0.1:" + redisPort)) {
+    try (RedisKeyValueStore<Bytes, Bytes> redis = RedisKeyValueStore.open(
+        "redis://127.0.0.1:" + redisPort,
+        Function.identity(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity())) {
       AsyncCompletion completion = redis.putAsync(Bytes.of(125), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = redis.getAsync(Bytes.of(125)).get();
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ae8c62b..94f4ffb 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -110,7 +110,7 @@ object InfinispanKeyValueStoreSpec : Spek({
       runBlocking {
         kv.put(bar, foo)
         kv.put(foo, bar)
-          val keys = (kv.keys().map { it })
+        val keys = (kv.keys().map { it })
         keys.should.contain(bar)
         keys.should.contain(foo)
       }
@@ -120,7 +120,12 @@ object InfinispanKeyValueStoreSpec : Spek({
 
 object MapDBKeyValueStoreSpec : Spek({
   val testDir = Files.createTempDirectory("data")
-  val kv = MapDBKeyValueStore(testDir.resolve("data.db"))
+  val kv = MapDBKeyValueStore(
+    testDir.resolve("data.db"),
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
 
   describe("a MapDB-backed key value store") {
 
@@ -141,12 +146,19 @@ object MapDBKeyValueStoreSpec : Spek({
       runBlocking {
         kv.put(foobar, foo)
         kv.put(foo, bar)
-        kv.keys().should.equal(setOf(foobar, foo))
+        val keys = kv.keys().map { it }
+        keys.should.contain(foo)
+        keys.should.contain(foobar)
       }
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db"))
+      val kv2 = MapDBKeyValueStore(
+        testDir.resolve("data2.db"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -168,7 +180,12 @@ object MapDBKeyValueStoreSpec : Spek({
 
 object LevelDBKeyValueStoreSpec : Spek({
   val path = Files.createTempDirectory("leveldb")
-  val kv = LevelDBKeyValueStore(path)
+  val kv = LevelDBKeyValueStore(
+    path,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -197,7 +214,12 @@ object LevelDBKeyValueStoreSpec : Spek({
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = LevelDBKeyValueStore(path.resolve("subdb"))
+      val kv2 = LevelDBKeyValueStore(
+        path.resolve("subdb"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -214,7 +236,12 @@ object LevelDBKeyValueStoreSpec : Spek({
 
 object RocksDBKeyValueStoreSpec : Spek({
   val path = Files.createTempDirectory("rocksdb")
-  val kv = RocksDBKeyValueStore(path)
+  val kv = RocksDBKeyValueStore(
+    path,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -245,7 +272,12 @@ object RocksDBKeyValueStoreSpec : Spek({
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = RocksDBKeyValueStore(path.resolve("subdb"))
+      val kv2 = RocksDBKeyValueStore(
+        path.resolve("subdb"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -269,8 +301,21 @@ object SQLKeyValueStoreSpec : Spek({
     st.executeUpdate("create table store(key binary, value binary, primary key(key))")
     st.executeUpdate("create table store2(id binary, val binary, primary key(id))")
   }
-  val kv = SQLKeyValueStore(jdbcUrl)
-  val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val")
+  val kv = SQLKeyValueStore(
+    jdbcUrl,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
+  val otherkv = SQLKeyValueStore.open(
+    jdbcUrl,
+    "store2",
+    "id",
+    "val",
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     otherkv.close()
@@ -316,7 +361,12 @@ object SQLKeyValueStoreSpec : Spek({
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
+      val kv2 = SQLKeyValueStore(
+        "jdbc:h2:mem:testdb",
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org