You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/02/20 23:36:39 UTC

[incubator-tuweni] branch master updated: Add support for clear

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 036743e  Add support for clear
036743e is described below

commit 036743e06f5d637d09746ecd7e6eae2ff9e48335
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Feb 20 15:27:55 2020 -0800

    Add support for clear
---
 .../main/java/org/apache/tuweni/io/file/Files.java |  2 +-
 kv/build.gradle                                    |  1 +
 .../apache/tuweni/kv/EntityManagerKeyValueStore.kt | 31 ++++++++--
 .../apache/tuweni/kv/InfinispanKeyValueStore.kt    |  4 ++
 .../kotlin/org/apache/tuweni/kv/KeyValueStore.kt   | 10 ++++
 .../org/apache/tuweni/kv/LevelDBKeyValueStore.kt   | 19 +++++--
 .../org/apache/tuweni/kv/MapDBKeyValueStore.kt     |  4 ++
 .../org/apache/tuweni/kv/MapKeyValueStore.kt       |  4 ++
 .../org/apache/tuweni/kv/ProxyKeyValueStore.kt     |  8 ++-
 .../org/apache/tuweni/kv/RedisKeyValueStore.kt     |  4 ++
 .../org/apache/tuweni/kv/RocksDBKeyValueStore.kt   | 21 +++++--
 .../org/apache/tuweni/kv/SQLKeyValueStore.kt       |  7 +++
 kv/src/test/java/org/apache/tuweni/kv/Store.java   |  2 +-
 .../org/apache/tuweni/kv/KeyValueStoreSpec.kt      | 66 +++++++++++++++++++++-
 14 files changed, 163 insertions(+), 20 deletions(-)

diff --git a/io/src/main/java/org/apache/tuweni/io/file/Files.java b/io/src/main/java/org/apache/tuweni/io/file/Files.java
index 277b987..05e51f8 100644
--- a/io/src/main/java/org/apache/tuweni/io/file/Files.java
+++ b/io/src/main/java/org/apache/tuweni/io/file/Files.java
@@ -69,7 +69,7 @@ public final class Files {
   public static void deleteRecursively(Path directory) throws IOException {
     checkNotNull(directory);
 
-    walkFileTree(directory, new SimpleFileVisitor<Path>() {
+    walkFileTree(directory, new SimpleFileVisitor<>() {
       @Override
       public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
         delete(file);
diff --git a/kv/build.gradle b/kv/build.gradle
index b96f9d4..f264a99 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -16,6 +16,7 @@ dependencies {
 
   compile project(':bytes')
   compile project(':concurrent-coroutines')
+  compile project(':io')
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
index 712aec9..f8b2905 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -38,6 +38,7 @@ class EntityManagerKeyValueStore<K, V>
      * Open a relational database backed key-value store using a JPA entity manager.
      *
      * @param entityManagerProvider The supplier of entity manager to operate.
+     * @param entityClass The class of objects to store in the store
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
@@ -75,12 +76,30 @@ class EntityManagerKeyValueStore<K, V>
 
   override suspend fun keys(): Iterable<K> {
     val em = entityManagerProvider()
-    val query = em.criteriaBuilder.createQuery(entityClass)
-    val root = query.from(entityClass)
-    val all = query.select(root)
-    val finalAll = em.createQuery(all)
-    val resultStream: Stream<V> = finalAll.resultStream
-    return Iterable { resultStream.map(idAccessor).iterator() }
+    em.transaction.begin()
+    try {
+      val query = em.criteriaBuilder.createQuery(entityClass)
+      val root = query.from(entityClass)
+      val all = query.select(root)
+      val finalAll = em.createQuery(all)
+      val resultStream: Stream<V> = finalAll.resultStream
+      return Iterable { resultStream.map(idAccessor).iterator() }
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
+  override suspend fun clear() {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      val deleteAll = em.createQuery("DELETE FROM ${em.metamodel.entity(entityClass).name}")
+      deleteAll.executeUpdate()
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
   }
 
   override fun close() {
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 50e353f..9bc0f36 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -51,6 +51,10 @@ class InfinispanKeyValueStore<K, V> constructor(
 
   override suspend fun keys(): Iterable<K> = cache.keys
 
+  override suspend fun clear() {
+    cache.clearAsync().await()
+  }
+
   /**
    * The cache is managed outside the scope of this key-value store.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 7f2775a..c7b6d0e 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -78,4 +78,14 @@ interface KeyValueStore<K, V> : Closeable, CoroutineScope {
    * @return An [Iterable] allowing to iterate over the set of keys.
    */
   fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
+
+  /**
+   * Clears the contents of the store.
+   */
+  suspend fun clear()
+
+  /**
+   * Clears the contents of the store.
+   */
+  suspend fun clearAsync() = asyncResult { clear() }
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index e30a68f..90f0fab 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -18,6 +18,7 @@ package org.apache.tuweni.kv
 
 import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.io.file.Files.deleteRecursively
 import org.fusesource.leveldbjni.JniDBFactory
 import org.iq80.leveldb.DB
 import org.iq80.leveldb.DBIterator
@@ -45,12 +46,12 @@ import kotlin.coroutines.CoroutineContext
 class LevelDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
-  dbPath: Path,
+  private val dbPath: Path,
   private val keySerializer: (K) -> Bytes,
   private val valueSerializer: (V) -> Bytes,
   private val keyDeserializer: (Bytes) -> K,
   private val valueDeserializer: (Bytes) -> V,
-  options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
+  private val options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore<K, V> {
 
@@ -130,11 +131,15 @@ constructor(
         Function.identity<Bytes>()::apply)
   }
 
-  private val db: DB
+  private var db: DB
 
   init {
+    db = create()
+  }
+
+  private fun create(): DB {
     Files.createDirectories(dbPath)
-    db = JniDBFactory.factory.open(dbPath.toFile(), options)
+    return JniDBFactory.factory.open(dbPath.toFile(), options)
   }
 
   override suspend fun get(key: K): V? {
@@ -161,6 +166,12 @@ constructor(
     return Iterable { KIterator(iter, keyDeserializer) }
   }
 
+  override suspend fun clear() {
+    close()
+    deleteRecursively(dbPath)
+    db = create()
+  }
+
   /**
    * Closes the underlying LevelDB instance.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index 2dae5fe..02908fb 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -126,6 +126,10 @@ constructor(
 
   override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
 
+  override suspend fun clear() {
+    storageData.clear()
+  }
+
   /**
    * Closes the underlying MapDB instance.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index 5eaca56..e7f0e84 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -61,6 +61,10 @@ constructor(
 
   override suspend fun keys(): Iterable<K> = map.keys
 
+  override suspend fun clear() {
+    map.clear()
+  }
+
   /**
    * Has no effect in this KeyValueStore implementation.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
index 0e5aee2..ca439ae 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/ProxyKeyValueStore.kt
@@ -66,9 +66,11 @@ class ProxyKeyValueStore<K, V, E, R>(
     }
   }
 
-    override suspend fun put(key: K, value: V) = store.put(proxyKey(key), proxyValue(key, value))
+  override suspend fun put(key: K, value: V) = store.put(proxyKey(key), proxyValue(value))
 
-    override suspend fun keys(): Iterable<K> = store.keys().map(unproxyKey)
+  override suspend fun keys(): Iterable<K> = store.keys().map(unproxyKey)
 
-    override fun close() = store.close()
+  override suspend fun clear() = store.clear()
+
+  override fun close() = store.close()
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index c85f285..b73d87f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -227,6 +227,10 @@ class RedisKeyValueStore<K, V>(
 
   override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
 
+  override suspend fun clear() {
+    asyncCommands.flushdb().await()
+  }
+
   override fun close() {
     conn.close()
   }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 30e8dad..e7f175e 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -18,6 +18,7 @@ package org.apache.tuweni.kv
 
 import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.io.file.Files.deleteRecursively
 import org.rocksdb.Options
 import org.rocksdb.RocksDB
 import org.rocksdb.RocksIterator
@@ -41,12 +42,12 @@ import kotlin.coroutines.CoroutineContext
 class RocksDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
-  dbPath: Path,
+  private val dbPath: Path,
   private val keySerializer: (K) -> Bytes,
   private val valueSerializer: (V) -> Bytes,
   private val keyDeserializer: (Bytes) -> K,
   private val valueDeserializer: (Bytes) -> V,
-  options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
+  private val options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : KeyValueStore<K, V> {
 
@@ -106,13 +107,17 @@ constructor(
         options)
   }
 
-  private val db: RocksDB
+  private var db: RocksDB
   private val closed = AtomicBoolean(false)
 
   init {
     RocksDB.loadLibrary()
+    db = create()
+  }
+
+  private fun create(): RocksDB {
     Files.createDirectories(dbPath)
-    db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
+    return RocksDB.open(options, dbPath.toAbsolutePath().toString())
   }
 
   override suspend fun get(key: K): V? {
@@ -154,6 +159,14 @@ constructor(
     return Iterable { BytesIterator(iter, keyDeserializer) }
   }
 
+  override suspend fun clear() {
+    close()
+    if (closed.compareAndSet(true, false)) {
+      deleteRecursively(dbPath)
+      db = create()
+    }
+  }
+
   /**
    * Closes the underlying RocksDB instance.
    */
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index 46c6e93..c9095cc 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -179,6 +179,13 @@ constructor(
     }
   }
 
+  override suspend fun clear() {
+    connectionPool.asyncConnection.await().use {
+      val stmt = it.prepareStatement("DELETE FROM $tableName")
+      stmt.execute()
+    }
+  }
+
   /**
    * Closes the underlying connection pool.
    */
diff --git a/kv/src/test/java/org/apache/tuweni/kv/Store.java b/kv/src/test/java/org/apache/tuweni/kv/Store.java
index 069db93..68f37f0 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/Store.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/Store.java
@@ -16,7 +16,7 @@ import java.util.Objects;
 import javax.persistence.Entity;
 import javax.persistence.Id;
 
-@Entity(name = "STORE")
+@Entity(name = "FOO_STORE")
 public class Store {
 
   public Store() {}
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ec2ab53..6a05b3c 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -77,6 +77,14 @@ object KeyValueStoreSpec : Spek({
         kv.keys().should.equal(setOf(foobar, foo))
       }
     }
+
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
   }
 })
 
@@ -117,6 +125,14 @@ object InfinispanKeyValueStoreSpec : Spek({
         keys.should.contain(foo)
       }
     }
+
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
   }
 })
 
@@ -154,6 +170,14 @@ object MapDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = MapDBKeyValueStore(
         testDir.resolve("data2.db"),
@@ -215,6 +239,14 @@ object LevelDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = LevelDBKeyValueStore(
         path.resolve("subdb"),
@@ -273,6 +305,14 @@ object RocksDBKeyValueStoreSpec : Spek({
       }
     }
 
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = RocksDBKeyValueStore(
         path.resolve("subdb"),
@@ -362,6 +402,14 @@ object SQLKeyValueStoreSpec : Spek({
       }
     }
 
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(foobar, foo)
+        kv.clear()
+        kv.get(foobar).should.be.`null`
+      }
+    }
+
     it("should not allow usage after the DB is closed") {
       val kv2 = SQLKeyValueStore(
         "jdbc:h2:mem:testdb",
@@ -429,6 +477,14 @@ object EntityManagerKeyValueStoreSpec : Spek({
         keys.should.contain("foo")
       }
     }
+
+    it("can clear its contents") {
+      runBlocking {
+        kv.put(Store("foo", "bar"))
+        kv.clear()
+        kv.get("foo").should.be.`null`
+      }
+    }
   }
 })
 
@@ -444,7 +500,7 @@ object ProxyKeyValueStoreSpec : Spek({
   afterGroup {
     proxy.close()
   }
-  describe("a proixy key value store") {
+  describe("a proxy key value store") {
 
     it("should allow to retrieve values") {
       runBlocking {
@@ -477,5 +533,13 @@ object ProxyKeyValueStoreSpec : Spek({
         keys.should.contain(foo)
       }
     }
+
+    it("can clear its contents") {
+      runBlocking {
+        proxy.put(foo, bar)
+        proxy.clear()
+        proxy.get(foo).should.be.`null`
+      }
+    }
   }
 })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org