You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/02/20 23:36:39 UTC
[incubator-tuweni] branch master updated: Add support for clear
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 036743e Add support for clear
036743e is described below
commit 036743e06f5d637d09746ecd7e6eae2ff9e48335
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Feb 20 15:27:55 2020 -0800
Add support for clear
---
.../main/java/org/apache/tuweni/io/file/Files.java | 2 +-
kv/build.gradle | 1 +
.../apache/tuweni/kv/EntityManagerKeyValueStore.kt | 31 ++++++++--
.../apache/tuweni/kv/InfinispanKeyValueStore.kt | 4 ++
.../kotlin/org/apache/tuweni/kv/KeyValueStore.kt | 10 ++++
.../org/apache/tuweni/kv/LevelDBKeyValueStore.kt | 19 +++++--
.../org/apache/tuweni/kv/MapDBKeyValueStore.kt | 4 ++
.../org/apache/tuweni/kv/MapKeyValueStore.kt | 4 ++
.../org/apache/tuweni/kv/ProxyKeyValueStore.kt | 8 ++-
.../org/apache/tuweni/kv/RedisKeyValueStore.kt | 4 ++
.../org/apache/tuweni/kv/RocksDBKeyValueStore.kt | 21 +++++--
.../org/apache/tuweni/kv/SQLKeyValueStore.kt | 7 +++
kv/src/test/java/org/apache/tuweni/kv/Store.java | 2 +-
.../org/apache/tuweni/kv/KeyValueStoreSpec.kt | 66 +++++++++++++++++++++-
14 files changed, 163 insertions(+), 20 deletions(-)
diff --git a/io/src/main/java/org/apache/tuweni/io/file/Files.java b/io/src/main/java/org/apache/tuweni/io/file/Files.java
index 277b987..05e51f8 100644
--- a/io/src/main/java/org/apache/tuweni/io/file/Files.java
+++ b/io/src/main/java/org/apache/tuweni/io/file/Files.java
@@ -69,7 +69,7 @@ public final class Files {
public static void deleteRecursively(Path directory) throws IOException {
checkNotNull(directory);
- walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ walkFileTree(directory, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
delete(file);
diff --git a/kv/build.gradle b/kv/build.gradle
index b96f9d4..f264a99 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -16,6 +16,7 @@ dependencies {
compile project(':bytes')
compile project(':concurrent-coroutines')
+ compile project(':io')
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
index 712aec9..f8b2905 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -38,6 +38,7 @@ class EntityManagerKeyValueStore<K, V>
* Open a relational database backed key-value store using a JPA entity manager.
*
* @param entityManagerProvider The supplier of entity manager to operate.
+ * @param entityClass The class of objects to store in the store
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@@ -75,12 +76,30 @@ class EntityManagerKeyValueStore<K, V>
override suspend fun keys(): Iterable<K> {
val em = entityManagerProvider()
- val query = em.criteriaBuilder.createQuery(entityClass)
- val root = query.from(entityClass)
- val all = query.select(root)
- val finalAll = em.createQuery(all)
- val resultStream: Stream<V> = finalAll.resultStream
- return Iterable { resultStream.map(idAccessor).iterator() }
+ em.transaction.begin()
+ try {
+ val query = em.criteriaBuilder.createQuery(entityClass)
+ val root = query.from(entityClass)
+ val all = query.select(root)
+ val finalAll = em.createQuery(all)
+ val resultStream: Stream<V> = finalAll.resultStream
+ return Iterable { resultStream.map(idAccessor).iterator() }
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
+ override suspend fun clear() {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ val deleteAll = em.createQuery("DELETE FROM ${em.metamodel.entity(entityClass).name}")
+ deleteAll.executeUpdate()
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
}
override fun close() {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 50e353f..9bc0f36 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -51,6 +51,10 @@ class InfinispanKeyValueStore<K, V> constructor(
override suspend fun keys(): Iterable<K> = cache.keys
+ override suspend fun clear() {
+ cache.clearAsync().await()
+ }
+
/**
* The cache is managed outside the scope of this key-value store.
*/
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 7f2775a..c7b6d0e 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -78,4 +78,14 @@ interface KeyValueStore<K, V> : Closeable, CoroutineScope {
* @return An [Iterable] allowing to iterate over the set of keys.
*/
fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
+
+ /**
+ * Clears the contents of the store.
+ */
+ suspend fun clear()
+
+ /**
+ * Clears the contents of the store.
+ */
+ suspend fun clearAsync() = asyncResult { clear() }
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index e30a68f..90f0fab 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -18,6 +18,7 @@ package org.apache.tuweni.kv
import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.io.file.Files.deleteRecursively
import org.fusesource.leveldbjni.JniDBFactory
import org.iq80.leveldb.DB
import org.iq80.leveldb.DBIterator
@@ -45,12 +46,12 @@ import kotlin.coroutines.CoroutineContext
class LevelDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
- dbPath: Path,
+ private val dbPath: Path,
private val keySerializer: (K) -> Bytes,
private val valueSerializer: (V) -> Bytes,
private val keyDeserializer: (Bytes) -> K,
private val valueDeserializer: (Bytes) -> V,
- options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
+ private val options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
override val coroutineContext: CoroutineContext = Dispatchers.IO
) : KeyValueStore<K, V> {
@@ -130,11 +131,15 @@ constructor(
Function.identity<Bytes>()::apply)
}
- private val db: DB
+ private var db: DB
init {
+ db = create()
+ }
+
+ private fun create(): DB {
Files.createDirectories(dbPath)
- db = JniDBFactory.factory.open(dbPath.toFile(), options)
+ return JniDBFactory.factory.open(dbPath.toFile(), options)
}
override suspend fun get(key: K): V? {
@@ -161,6 +166,12 @@ constructor(
return Iterable { KIterator(iter, keyDeserializer) }
}
+ override suspend fun clear() {
+ close()
+ deleteRecursively(dbPath)
+ db = create()
+ }
+
/**
* Closes the underlying LevelDB instance.
*/
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index 2dae5fe..02908fb 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -126,6 +126,10 @@ constructor(
override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
+ override suspend fun clear() {
+ storageData.clear()
+ }
+
/**
* Closes the underlying MapDB instance.
*/
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index 5eaca56..e7f0e84 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -61,6 +61,10 @@ constructor(
override suspend fun keys(): Iterable<K> = map.keys
+ override suspend fun clear() {
+ map.clear()
+ }
+
/**
* Has no effect in this KeyValueStore implementation.
*/
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
index 0e5aee2..ca439ae 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
@@ -66,9 +66,11 @@ class ProxyKeyValueStore<K, V, E, R>(
}
}
- override suspend fun put(key: K, value: V) = store.put(proxyKey(key), proxyValue(key, value))
+ override suspend fun put(key: K, value: V) = store.put(proxyKey(key), proxyValue(value))
- override suspend fun keys(): Iterable<K> = store.keys().map(unproxyKey)
+ override suspend fun keys(): Iterable<K> = store.keys().map(unproxyKey)
- override fun close() = store.close()
+ override suspend fun clear() = store.clear()
+
+ override fun close() = store.close()
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index c85f285..b73d87f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -227,6 +227,10 @@ class RedisKeyValueStore<K, V>(
override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
+ override suspend fun clear() {
+ asyncCommands.flushdb().await()
+ }
+
override fun close() {
conn.close()
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 30e8dad..e7f175e 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -18,6 +18,7 @@ package org.apache.tuweni.kv
import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.io.file.Files.deleteRecursively
import org.rocksdb.Options
import org.rocksdb.RocksDB
import org.rocksdb.RocksIterator
@@ -41,12 +42,12 @@ import kotlin.coroutines.CoroutineContext
class RocksDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
- dbPath: Path,
+ private val dbPath: Path,
private val keySerializer: (K) -> Bytes,
private val valueSerializer: (V) -> Bytes,
private val keyDeserializer: (Bytes) -> K,
private val valueDeserializer: (Bytes) -> V,
- options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
+ private val options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
override val coroutineContext: CoroutineContext = Dispatchers.IO
) : KeyValueStore<K, V> {
@@ -106,13 +107,17 @@ constructor(
options)
}
- private val db: RocksDB
+ private var db: RocksDB
private val closed = AtomicBoolean(false)
init {
RocksDB.loadLibrary()
+ db = create()
+ }
+
+ private fun create(): RocksDB {
Files.createDirectories(dbPath)
- db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
+ return RocksDB.open(options, dbPath.toAbsolutePath().toString())
}
override suspend fun get(key: K): V? {
@@ -154,6 +159,14 @@ constructor(
return Iterable { BytesIterator(iter, keyDeserializer) }
}
+ override suspend fun clear() {
+ close()
+ if (closed.compareAndSet(true, false)) {
+ deleteRecursively(dbPath)
+ db = create()
+ }
+ }
+
/**
* Closes the underlying RocksDB instance.
*/
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index 46c6e93..c9095cc 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -179,6 +179,13 @@ constructor(
}
}
+ override suspend fun clear() {
+ connectionPool.asyncConnection.await().use {
+ val stmt = it.prepareStatement("DELETE FROM $tableName")
+ stmt.execute()
+ }
+ }
+
/**
* Closes the underlying connection pool.
*/
diff --git a/kv/src/test/java/org/apache/tuweni/kv/Store.java b/kv/src/test/java/org/apache/tuweni/kv/Store.java
index 069db93..68f37f0 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/Store.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/Store.java
@@ -16,7 +16,7 @@ import java.util.Objects;
import javax.persistence.Entity;
import javax.persistence.Id;
-@Entity(name = "STORE")
+@Entity(name = "FOO_STORE")
public class Store {
public Store() {}
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ec2ab53..6a05b3c 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -77,6 +77,14 @@ object KeyValueStoreSpec : Spek({
kv.keys().should.equal(setOf(foobar, foo))
}
}
+
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
}
})
@@ -117,6 +125,14 @@ object InfinispanKeyValueStoreSpec : Spek({
keys.should.contain(foo)
}
}
+
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
}
})
@@ -154,6 +170,14 @@ object MapDBKeyValueStoreSpec : Spek({
}
}
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
+
it("should not allow usage after the DB is closed") {
val kv2 = MapDBKeyValueStore(
testDir.resolve("data2.db"),
@@ -215,6 +239,14 @@ object LevelDBKeyValueStoreSpec : Spek({
}
}
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
+
it("should not allow usage after the DB is closed") {
val kv2 = LevelDBKeyValueStore(
path.resolve("subdb"),
@@ -273,6 +305,14 @@ object RocksDBKeyValueStoreSpec : Spek({
}
}
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
+
it("should not allow usage after the DB is closed") {
val kv2 = RocksDBKeyValueStore(
path.resolve("subdb"),
@@ -362,6 +402,14 @@ object SQLKeyValueStoreSpec : Spek({
}
}
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(foobar, foo)
+ kv.clear()
+ kv.get(foobar).should.be.`null`
+ }
+ }
+
it("should not allow usage after the DB is closed") {
val kv2 = SQLKeyValueStore(
"jdbc:h2:mem:testdb",
@@ -429,6 +477,14 @@ object EntityManagerKeyValueStoreSpec : Spek({
keys.should.contain("foo")
}
}
+
+ it("can clear its contents") {
+ runBlocking {
+ kv.put(Store("foo", "bar"))
+ kv.clear()
+ kv.get("foo").should.be.`null`
+ }
+ }
}
})
@@ -444,7 +500,7 @@ object ProxyKeyValueStoreSpec : Spek({
afterGroup {
proxy.close()
}
- describe("a proixy key value store") {
+ describe("a proxy key value store") {
it("should allow to retrieve values") {
runBlocking {
@@ -477,5 +533,13 @@ object ProxyKeyValueStoreSpec : Spek({
keys.should.contain(foo)
}
}
+
+ it("can clear its contents") {
+ runBlocking {
+ proxy.put(foo, bar)
+ proxy.clear()
+ proxy.get(foo).should.be.`null`
+ }
+ }
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org