You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/02/12 23:03:55 UTC
[incubator-tuweni] branch support_kv_store created (now 991aefd)
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a change to branch support_kv_store
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git.
at 991aefd Make key-value stores generic
This branch includes the following new commits:
new 991aefd Make key-value stores generic
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org
[incubator-tuweni] 01/01: Make key-value stores generic
Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch support_kv_store
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 991aefd5a97af667ce946f0701d44ed06702e27f
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed Feb 12 15:03:36 2020 -0800
Make key-value stores generic
---
dependency-versions.gradle | 1 +
.../tuweni/eth/repository/BlockchainRepository.kt | 16 +--
kv/build.gradle | 1 +
.../apache/tuweni/kv/EntityManagerKeyValueStore.kt | 86 +++++++++++++++
.../apache/tuweni/kv/InfinispanKeyValueStore.kt | 16 +--
.../kotlin/org/apache/tuweni/kv/KeyValueStore.kt | 15 ++-
.../org/apache/tuweni/kv/LevelDBKeyValueStore.kt | 67 +++++++++---
.../org/apache/tuweni/kv/MapDBKeyValueStore.kt | 33 ++++--
.../org/apache/tuweni/kv/MapKeyValueStore.kt | 17 ++-
.../org/apache/tuweni/kv/RedisKeyValueStore.kt | 115 ++++++++++++++++++---
.../org/apache/tuweni/kv/RocksDBKeyValueStore.kt | 66 +++++++++---
.../org/apache/tuweni/kv/SQLKeyValueStore.kt | 74 +++++++++----
.../org/apache/tuweni/kv/KeyValueStoreTest.java | 23 ++++-
.../apache/tuweni/kv/RedisKeyValueStoreTest.java | 19 +++-
.../org/apache/tuweni/kv/KeyValueStoreSpec.kt | 72 +++++++++++--
15 files changed, 505 insertions(+), 116 deletions(-)
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 9c3d571..fa8d1cd 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -29,6 +29,7 @@ dependencyManagement {
dependency('info.picocli:picocli:4.0.0-alpha-2')
dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
dependency('io.vertx:vertx-core:3.6.2')
+ dependency('javax.persistence:javax.persistence-api:2.2')
dependencySet(group: 'org.antlr', version: '4.7.1') {
entry 'antlr4'
entry 'antlr4-runtime'
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index b7c478e..9c4e30a 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -41,10 +41,10 @@ class BlockchainRepository
* @param blockchainIndex the blockchain index to index values
*/
(
- private val chainMetadata: KeyValueStore,
- private val blockBodyStore: KeyValueStore,
- private val blockHeaderStore: KeyValueStore,
- private val transactionReceiptsStore: KeyValueStore,
+ private val chainMetadata: KeyValueStore<Bytes, Bytes>,
+ private val blockBodyStore: KeyValueStore<Bytes, Bytes>,
+ private val blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+ private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
private val blockchainIndex: BlockchainIndex
) {
@@ -58,10 +58,10 @@ class BlockchainRepository
* @return a new blockchain repository made from the metadata passed in parameter.
*/
suspend fun init(
- blockBodyStore: KeyValueStore,
- blockHeaderStore: KeyValueStore,
- chainMetadata: KeyValueStore,
- transactionReceiptsStore: KeyValueStore,
+ blockBodyStore: KeyValueStore<Bytes, Bytes>,
+ blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+ chainMetadata: KeyValueStore<Bytes, Bytes>,
+ transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
blockchainIndex: BlockchainIndex,
genesisBlock: Block
): BlockchainRepository {
diff --git a/kv/build.gradle b/kv/build.gradle
index 0f20ad0..d05b5c2 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -19,6 +19,7 @@ dependencies {
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
+ compile 'javax.persistence:javax.persistence-api'
compileOnly 'com.jolbox:bonecp'
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
new file mode 100644
index 0000000..8b653ad
--- /dev/null
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.kv
+
+import kotlin.jvm.Throws
+import kotlinx.coroutines.Dispatchers
+
+import java.io.IOException
+import java.util.function.Function
+import java.util.function.Supplier
+import java.util.stream.Stream
+import javax.persistence.EntityManager
+import kotlin.coroutines.CoroutineContext
+
+class EntityManagerKeyValueStore<K, V>
+ @Throws(IOException::class)
+ constructor(
+ private val entityManagerProvider: () -> EntityManager,
+ private val entityClass: Class<V>,
+ private val idAccessor: (V) -> K,
+ override val coroutineContext: CoroutineContext = Dispatchers.IO
+ ) : KeyValueStore<K, V> {
+
+ companion object {
+ /**
+ * Open a relational database backed key-value store using a JPA entity manager.
+ *
+ * @param entityManagerProvider The supplier of entity manager to operate.
+ * @return A key-value store.
+ * @throws IOException If an I/O error occurs.
+ */
+ @JvmStatic
+ @Throws(IOException::class)
+ fun <K, V> open(
+ entityManagerProvider: Supplier<EntityManager>,
+ entityClass: Class<V>,
+ idAccessor: Function<V, K>
+ ) = EntityManagerKeyValueStore<K, V>(entityManagerProvider::get, entityClass, idAccessor::apply)
+ }
+
+ override suspend fun get(key: K): V? {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ return em.find(entityClass, key)
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
+ override suspend fun put(key: K, value: V) {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ em.merge(value)
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
+ override suspend fun keys(): Iterable<K> {
+ val em = entityManagerProvider()
+ val query = em.createQuery(em.criteriaBuilder.createQuery(entityClass))
+ val resultStream: Stream<V> = query.resultStream
+ return Iterable { resultStream.map(idAccessor).iterator() }
+ }
+
+ override fun close() {
+ }
+}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 85a8b92..50e353f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -18,7 +18,7 @@ package org.apache.tuweni.kv
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.future.await
-import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
import org.infinispan.Cache
import kotlin.coroutines.CoroutineContext
@@ -26,10 +26,10 @@ import kotlin.coroutines.CoroutineContext
* A key-value store backed by [Infinispan](https://infinispan.org)
*
*/
-class InfinispanKeyValueStore constructor(
- private val cache: Cache<Bytes, Bytes>,
+class InfinispanKeyValueStore<K, V> constructor(
+ private val cache: Cache<K, V>,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
@@ -40,16 +40,16 @@ class InfinispanKeyValueStore constructor(
* @return A key-value store.
*/
@JvmStatic
- fun open(cache: Cache<Bytes, Bytes>) = InfinispanKeyValueStore(cache)
+ fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
}
- override suspend fun get(key: Bytes): Bytes? = cache.getAsync(key).await()
+ override suspend fun get(key: K): V? = cache.getAsync(key).await()
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
cache.putAsync(key, value).await()
}
- override suspend fun keys(): Iterable<Bytes> = cache.keys
+ override suspend fun keys(): Iterable<K> = cache.keys
/**
* The cache is managed outside the scope of this key-value store.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 588eddf..7f2775a 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -17,7 +17,6 @@
package org.apache.tuweni.kv
import kotlinx.coroutines.CoroutineScope
-import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.AsyncResult
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
@@ -27,7 +26,7 @@ import java.io.Closeable
/**
* A key-value store.
*/
-interface KeyValueStore : Closeable, CoroutineScope {
+interface KeyValueStore<K, V> : Closeable, CoroutineScope {
/**
* Retrieves data from the store.
@@ -35,7 +34,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
* @param key The key for the content.
* @return The stored data, or null if no data was stored under the specified key.
*/
- suspend fun get(key: Bytes): Bytes?
+ suspend fun get(key: K): V?
/**
* Retrieves data from the store.
@@ -44,7 +43,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
* @return An [AsyncResult] that will complete with the stored content,
* or an empty optional if no content was available.
*/
- fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) }
+ fun getAsync(key: K): AsyncResult<V?> = asyncResult { get(key) }
/**
* Puts data into the store.
@@ -52,7 +51,7 @@ interface KeyValueStore : Closeable, CoroutineScope {
* @param key The key to associate with the data, for use when retrieving.
* @param value The data to store.
*/
- suspend fun put(key: Bytes, value: Bytes)
+ suspend fun put(key: K, value: V)
/**
* Puts data into the store.
@@ -64,19 +63,19 @@ interface KeyValueStore : Closeable, CoroutineScope {
* @param value The data to store.
* @return An [AsyncCompletion] that will complete when the content is stored.
*/
- fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) }
+ fun putAsync(key: K, value: V): AsyncCompletion = asyncCompletion { put(key, value) }
/**
* Provides an iterator over the keys of the store.
*
* @return An [Iterable] allowing to iterate over the set of keys.
*/
- suspend fun keys(): Iterable<Bytes>
+ suspend fun keys(): Iterable<K>
/**
* Provides an iterator over the keys of the store.
*
* @return An [Iterable] allowing to iterate over the set of keys.
*/
- fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() }
+ fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 21ba81d..f0e8985 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -25,49 +25,91 @@ import org.iq80.leveldb.Options
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
* A key-value store backed by LevelDB.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the levelDB database.
* @param coroutineContext The co-routine context for blocking tasks.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
* @constructor Open a LevelDB-backed key-value store.
*/
-class LevelDBKeyValueStore
+class LevelDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
+
/**
* Open a LevelDB-backed key-value store.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = LevelDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ): LevelDBKeyValueStore<K, V> =
+ LevelDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a LevelDB-backed key-value store.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the levelDB database.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path, options: Options) = LevelDBKeyValueStore(dbPath, options)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>,
+ options: Options
+ ) =
+ LevelDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply,
+ options)
}
private val db: DB
@@ -77,27 +119,28 @@ constructor(
db = JniDBFactory.factory.open(dbPath.toFile(), options)
}
- override suspend fun get(key: Bytes): Bytes? {
- val rawValue = db[key.toArrayUnsafe()]
+ override suspend fun get(key: K): V? {
+ val rawValue = db[keySerializer(key).toArrayUnsafe()]
return if (rawValue == null) {
null
} else {
- Bytes.wrap(rawValue)
+ valueDeserializer(Bytes.wrap(rawValue))
}
}
- override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+ override suspend fun put(key: K, value: V) = db.put(keySerializer(key).toArrayUnsafe(),
+ valueSerializer(value).toArrayUnsafe())
- private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> {
+ private class KIterator<K>(val iter: DBIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
override fun hasNext(): Boolean = iter.hasNext()
- override fun next(): Bytes = Bytes.wrap(iter.next().key)
+ override fun next(): K = keyDeserializer(Bytes.wrap(iter.next().key))
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
val iter = db.iterator()
iter.seekToFirst()
- return Iterable { BytesIterator(iter) }
+ return Iterable { KIterator(iter, keyDeserializer) }
}
/**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index f5bf128..190108f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -32,29 +32,48 @@ import kotlin.coroutines.CoroutineContext
* A key-value store backed by a MapDB instance.
*
* @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param coroutineContext The co-routine context for blocking tasks.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
* @constructor Open a MapDB-backed key-value store.
*/
-class MapDBKeyValueStore
+class MapDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes?) -> V?,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a MapDB-backed key-value store.
*
* @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = MapDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) =
+ MapDBKeyValueStore<K, V>(dbPath, keySerializer, valueSerializer, keyDeserializer, valueDeserializer)
}
private val db: DB
@@ -70,14 +89,14 @@ constructor(
).createOrOpen()
}
- override suspend fun get(key: Bytes): Bytes? = storageData[key]
+ override suspend fun get(key: K): V? = valueDeserializer(storageData[keySerializer(key)])
- override suspend fun put(key: Bytes, value: Bytes) {
- storageData[key] = value
+ override suspend fun put(key: K, value: V) {
+ storageData[keySerializer(key)] = valueSerializer(value)
db.commit()
}
- override suspend fun keys(): Iterable<Bytes> = storageData.keys
+ override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
/**
* Closes the underlying MapDB instance.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index bc15460..5eaca56 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -17,7 +17,6 @@
package org.apache.tuweni.kv
import kotlinx.coroutines.Dispatchers
-import org.apache.tuweni.bytes.Bytes
import kotlin.coroutines.CoroutineContext
/**
@@ -27,11 +26,11 @@ import kotlin.coroutines.CoroutineContext
* @return A key-value store.
* @constructor Open an in-memory key-value store.
*/
-class MapKeyValueStore
+class MapKeyValueStore<K, V>
constructor(
- private val map: MutableMap<Bytes, Bytes> = HashMap(),
+ private val map: MutableMap<K, V> = HashMap(),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
@@ -42,7 +41,7 @@ constructor(
* @return A key-value store.
*/
@JvmStatic
- fun open(): MapKeyValueStore = MapKeyValueStore()
+ fun <K, V> open() = MapKeyValueStore<K, V>()
/**
* Open an in-memory key-value store.
@@ -51,16 +50,16 @@ constructor(
* @return A key-value store.
*/
@JvmStatic
- fun open(map: MutableMap<Bytes, Bytes>) = MapKeyValueStore(map)
+ fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
}
- override suspend fun get(key: Bytes): Bytes? = map[key]
+ override suspend fun get(key: K): V? = map[key]
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
map[key] = value
}
- override suspend fun keys(): Iterable<Bytes> = map.keys
+ override suspend fun keys(): Iterable<K> = map.keys
/**
* Has no effect in this KeyValueStore implementation.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index 8e8717c..f22f0cd 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -24,58 +24,131 @@ import io.lettuce.core.codec.RedisCodec
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.future.await
import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
import java.net.InetAddress
import java.util.concurrent.CompletionStage
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
* A key-value store backed by Redis.
*
* @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
+ * @param coroutineContext the co-routine context in which this store executes
* @constructor Open a Redis-backed key-value store.
*/
-class RedisKeyValueStore(
+class RedisKeyValueStore<K, V>(
uri: String,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a Redis-backed key-value store.
*
* @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(uri: String) = RedisKeyValueStore(uri)
+ fun <K, V> open(
+ uri: String,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) = RedisKeyValueStore(uri,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param port The port for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(port: Int) = RedisKeyValueStore(port)
+ fun <K, V> open(
+ port: Int,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(port = port,
+ keySerializer = keySerializer::apply,
+ valueSerializer = valueSerializer::apply,
+ keyDeserializer = keyDeserializer::apply,
+ valueDeserializer = valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(address: InetAddress) = RedisKeyValueStore(6379, address)
+ fun <K, V> open(
+ address: InetAddress,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(6379,
+ address,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param port The port for the Redis store.
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(port: Int, address: InetAddress) = RedisKeyValueStore(port, address)
+ fun <K, V> open(
+ port: Int,
+ address: InetAddress,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(port,
+ address,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* A [RedisCodec] for working with Bytes classes.
@@ -94,12 +167,24 @@ class RedisKeyValueStore(
*
* @param port The port for the Redis store.
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
*/
@JvmOverloads
constructor(
port: Int = 6379,
- address: InetAddress = InetAddress.getLoopbackAddress()
- ) : this(RedisURI.create(address.hostAddress, port).toURI().toString())
+ address: InetAddress = InetAddress.getLoopbackAddress(),
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes) -> V
+ ) : this(RedisURI.create(address.hostAddress, port).toURI().toString(),
+ keySerializer,
+ valueSerializer,
+ keyDeserializer,
+ valueDeserializer)
init {
val redisClient = RedisClient.create(uri)
@@ -107,14 +192,20 @@ class RedisKeyValueStore(
asyncCommands = conn.async()
}
- override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()
+ override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
+ if (it == null) {
+ null
+ } else {
+ valueDeserializer(it)
+ }
+ }.await()
- override suspend fun put(key: Bytes, value: Bytes) {
- val future: CompletionStage<String> = asyncCommands.set(key, value)
+ override suspend fun put(key: K, value: V) {
+ val future: CompletionStage<String> = asyncCommands.set(keySerializer(key), valueSerializer(value))
future.await()
}
- override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await()
+ override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
override fun close() {
conn.close()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 7911803..30e8dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -25,6 +25,7 @@ import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
@@ -37,37 +38,72 @@ import kotlin.coroutines.CoroutineContext
* @throws IOException If an I/O error occurs.
* @constructor Open a RocksDB-backed key-value store.
*/
-class RocksDBKeyValueStore
+class RocksDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a RocksDB-backed key-value store.
*
* @param dbPath The path to the RocksDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = RocksDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) = RocksDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a RocksDB-backed key-value store.
*
* @param dbPath The path to the RocksDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the RocksDB database.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path, options: Options) = RocksDBKeyValueStore(dbPath, options)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>,
+ options: Options
+ ) =
+ RocksDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply,
+ options)
}
private val db: RocksDB
@@ -79,43 +115,43 @@ constructor(
db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
}
- override suspend fun get(key: Bytes): Bytes? {
+ override suspend fun get(key: K): V? {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
- val rawValue = db[key.toArrayUnsafe()]
+ val rawValue = db[keySerializer(key).toArrayUnsafe()]
return if (rawValue == null) {
null
} else {
- Bytes.wrap(rawValue)
+ valueDeserializer(Bytes.wrap(rawValue))
}
}
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
- db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+ db.put(keySerializer(key).toArrayUnsafe(), valueSerializer(value).toArrayUnsafe())
}
- private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> {
+ private class BytesIterator<K>(val rIterator: RocksIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
override fun hasNext(): Boolean = rIterator.isValid
- override fun next(): Bytes {
- val key = Bytes.wrap(rIterator.key())
+ override fun next(): K {
+ val key = rIterator.key()
rIterator.next()
- return key
+ return keyDeserializer(Bytes.wrap(key))
}
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
val iter = db.newIterator()
iter.seekToFirst()
- return Iterable { BytesIterator(iter) }
+ return Iterable { BytesIterator(iter, keyDeserializer) }
}
/**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index aa07336..46c6e93 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -38,27 +38,45 @@ import kotlin.coroutines.CoroutineContext
* @throws IOException If an I/O error occurs.
* @constructor Open a relational database backed key-value store.
*/
-class SQLKeyValueStore
+class SQLKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
jdbcurl: String,
val tableName: String = "store",
val keyColumn: String = "key",
val valueColumn: String = "value",
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes?) -> V?,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a relational database backed key-value store.
*
* @param jdbcUrl The JDBC url to connect to the database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl)
+ fun <K, V> open(
+ jdbcUrl: String,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) = SQLKeyValueStore<K, V>(jdbcurl = jdbcUrl,
+ keySerializer = keySerializer,
+ valueSerializer = valueSerializer,
+ keyDeserializer = keyDeserializer,
+ valueDeserializer = valueDeserializer)
/**
* Open a relational database backed key-value store.
@@ -67,13 +85,33 @@ constructor(
* @param tableName the name of the table to use for storage.
* @param keyColumn the key column of the store.
* @param valueColumn the value column of the store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) =
- SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn)
+ fun <K, V> open(
+ jdbcUrl: String,
+ tableName: String,
+ keyColumn: String,
+ valueColumn: String,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) =
+ SQLKeyValueStore<K, V>(jdbcUrl,
+ tableName,
+ keyColumn,
+ valueColumn,
+ keySerializer,
+ valueSerializer,
+ keyDeserializer,
+ valueDeserializer)
}
private val connectionPool: BoneCP
@@ -85,34 +123,34 @@ constructor(
connectionPool = BoneCP(config)
}
- override suspend fun get(key: Bytes): Bytes? {
+ override suspend fun get(key: K): V? {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
- stmt.setBytes(1, key.toArrayUnsafe())
+ stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
stmt.execute()
val rs = stmt.resultSet
return if (rs.next()) {
- Bytes.wrap(rs.getBytes(1))
+ valueDeserializer(Bytes.wrap(rs.getBytes(1)))
} else {
null
}
}
}
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
- stmt.setBytes(1, key.toArrayUnsafe())
- stmt.setBytes(2, value.toArrayUnsafe())
+ stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+ stmt.setBytes(2, valueSerializer(value).toArrayUnsafe())
it.autoCommit = false
try {
stmt.execute()
} catch (e: SQLException) {
val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?")
- updateStmt.setBytes(1, value.toArrayUnsafe())
- updateStmt.setBytes(2, key.toArrayUnsafe())
+ updateStmt.setBytes(1, valueSerializer(value).toArrayUnsafe())
+ updateStmt.setBytes(2, keySerializer(key).toArrayUnsafe())
updateStmt.execute()
}
it.commit()
@@ -120,24 +158,24 @@ constructor(
}
}
- private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> {
+ private class SQLIterator<K>(val resultSet: ResultSet, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
private var next = resultSet.next()
override fun hasNext(): Boolean = next
- override fun next(): Bytes {
- val key = Bytes.wrap(resultSet.getBytes(1))
+ override fun next(): K {
+ val key = keyDeserializer(Bytes.wrap(resultSet.getBytes(1)))
next = resultSet.next()
return key
}
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName")
stmt.execute()
- return Iterable { SQLIterator(stmt.resultSet) }
+ return Iterable { SQLIterator(stmt.resultSet, keyDeserializer) }
}
}
diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
index b5d8c2a..19e0f37 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +35,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(TempDirectoryExtension.class)
class KeyValueStoreTest {
+ private static final Function<Bytes, Bytes> bytesIdentityFn = Function.identity();
+
@Test
void testPutAndGet() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,14 +52,14 @@ class KeyValueStoreTest {
@Test
void testNoValue() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
assertNull(store.getAsync(Bytes.of(123)).get());
}
@Test
void testKeys() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Set<Bytes> keys = new HashSet<>();
@@ -66,7 +69,12 @@ class KeyValueStoreTest {
@Test
void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
- try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+ try (LevelDBKeyValueStore<Bytes, Bytes> leveldb = LevelDBKeyValueStore.open(
+ tempDirectory.resolve("foo").resolve("bar"),
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn)) {
AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = leveldb.getAsync(Bytes.of(123)).get();
@@ -77,7 +85,12 @@ class KeyValueStoreTest {
@Test
void testRocksDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
- try (RocksDBKeyValueStore rocksdb = RocksDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+ try (RocksDBKeyValueStore<Bytes, Bytes> rocksdb = RocksDBKeyValueStore.open(
+ tempDirectory.resolve("foo").resolve("bar"),
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn)) {
AsyncCompletion completion = rocksdb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = rocksdb.getAsync(Bytes.of(123)).get();
diff --git a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
index c218a65..db44ae1 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.tuweni.junit.RedisPort;
import org.apache.tuweni.junit.RedisServerExtension;
import java.net.InetAddress;
+import java.util.function.Function;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
@@ -34,7 +35,8 @@ class RedisKeyValueStoreTest {
@Test
void testPutAndGet(@RedisPort Integer redisPort) throws Exception {
- KeyValueStore store = RedisKeyValueStore.open(redisPort);
+ KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore
+ .open(redisPort, Function.identity(), Function.identity(), Function.identity(), Function.identity());
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,13 +51,24 @@ class RedisKeyValueStoreTest {
@Test
void testNoValue(@RedisPort Integer redisPort) throws Exception {
- KeyValueStore store = RedisKeyValueStore.open(redisPort, InetAddress.getLoopbackAddress());
+ KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore.open(
+ redisPort,
+ InetAddress.getLoopbackAddress(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity());
assertNull(store.getAsync(Bytes.of(124)).get());
}
@Test
void testRedisCloseable(@RedisPort Integer redisPort) throws Exception {
- try (RedisKeyValueStore redis = RedisKeyValueStore.open("redis://127.0.0.1:" + redisPort)) {
+ try (RedisKeyValueStore<Bytes, Bytes> redis = RedisKeyValueStore.open(
+ "redis://127.0.0.1:" + redisPort,
+ Function.identity(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity())) {
AsyncCompletion completion = redis.putAsync(Bytes.of(125), Bytes.of(10, 12, 13));
completion.join();
Bytes value = redis.getAsync(Bytes.of(125)).get();
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ae8c62b..94f4ffb 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -110,7 +110,7 @@ object InfinispanKeyValueStoreSpec : Spek({
runBlocking {
kv.put(bar, foo)
kv.put(foo, bar)
- val keys = (kv.keys().map { it })
+ val keys = (kv.keys().map { it })
keys.should.contain(bar)
keys.should.contain(foo)
}
@@ -120,7 +120,12 @@ object InfinispanKeyValueStoreSpec : Spek({
object MapDBKeyValueStoreSpec : Spek({
val testDir = Files.createTempDirectory("data")
- val kv = MapDBKeyValueStore(testDir.resolve("data.db"))
+ val kv = MapDBKeyValueStore(
+ testDir.resolve("data.db"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
describe("a MapDB-backed key value store") {
@@ -141,12 +146,19 @@ object MapDBKeyValueStoreSpec : Spek({
runBlocking {
kv.put(foobar, foo)
kv.put(foo, bar)
- kv.keys().should.equal(setOf(foobar, foo))
+ val keys = kv.keys().map { it }
+ keys.should.contain(foo)
+ keys.should.contain(foobar)
}
}
it("should not allow usage after the DB is closed") {
- val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db"))
+ val kv2 = MapDBKeyValueStore(
+ testDir.resolve("data2.db"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -168,7 +180,12 @@ object MapDBKeyValueStoreSpec : Spek({
object LevelDBKeyValueStoreSpec : Spek({
val path = Files.createTempDirectory("leveldb")
- val kv = LevelDBKeyValueStore(path)
+ val kv = LevelDBKeyValueStore(
+ path,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -197,7 +214,12 @@ object LevelDBKeyValueStoreSpec : Spek({
}
it("should not allow usage after the DB is closed") {
- val kv2 = LevelDBKeyValueStore(path.resolve("subdb"))
+ val kv2 = LevelDBKeyValueStore(
+ path.resolve("subdb"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -214,7 +236,12 @@ object LevelDBKeyValueStoreSpec : Spek({
object RocksDBKeyValueStoreSpec : Spek({
val path = Files.createTempDirectory("rocksdb")
- val kv = RocksDBKeyValueStore(path)
+ val kv = RocksDBKeyValueStore(
+ path,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -245,7 +272,12 @@ object RocksDBKeyValueStoreSpec : Spek({
}
it("should not allow usage after the DB is closed") {
- val kv2 = RocksDBKeyValueStore(path.resolve("subdb"))
+ val kv2 = RocksDBKeyValueStore(
+ path.resolve("subdb"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -269,8 +301,21 @@ object SQLKeyValueStoreSpec : Spek({
st.executeUpdate("create table store(key binary, value binary, primary key(key))")
st.executeUpdate("create table store2(id binary, val binary, primary key(id))")
}
- val kv = SQLKeyValueStore(jdbcUrl)
- val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val")
+ val kv = SQLKeyValueStore(
+ jdbcUrl,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
+ val otherkv = SQLKeyValueStore.open(
+ jdbcUrl,
+ "store2",
+ "id",
+ "val",
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
otherkv.close()
@@ -316,7 +361,12 @@ object SQLKeyValueStoreSpec : Spek({
}
it("should not allow usage after the DB is closed") {
- val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
+ val kv2 = SQLKeyValueStore(
+ "jdbc:h2:mem:testdb",
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org