You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:46 UTC

[30/50] [abbrv] samza git commit: SAMZA-819 RocksDbKeyValueStore.flush() should be implemented

SAMZA-819 RocksDbKeyValueStore.flush() should be implemented


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c84a0b54
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c84a0b54
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c84a0b54

Branch: refs/heads/samza-sql
Commit: c84a0b54d634de5a4c996ae3ce9742d98ae23342
Parents: 429f245
Author: Tao Feng <fe...@gmail.com>
Authored: Fri Nov 20 14:43:36 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Fri Nov 20 14:43:36 2015 -0800

----------------------------------------------------------------------
 .../RocksDbKeyValueStorageEngineFactory.scala   |  5 ++--
 .../samza/storage/kv/RocksDbKeyValueStore.scala |  5 ++--
 .../storage/kv/TestRocksDbKeyValueStore.scala   | 24 +++++++++++++++++++-
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index b949793..dae6e35 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -24,7 +24,7 @@ import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.storage.kv._
 import org.apache.samza.system.SystemStreamPartition
-import org.rocksdb.WriteOptions
+import org.rocksdb.{FlushOptions, WriteOptions}
 import org.apache.samza.config.StorageConfig._
 
 class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
@@ -48,7 +48,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
     val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
     val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
-    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbMetrics)
+    val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
+    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbFlushOptions, rocksDbMetrics)
     rocksDb
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 4620037..211fc3b 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -95,6 +95,7 @@ class RocksDbKeyValueStore(
   val isLoggedStore: Boolean,
   val storeName: String,
   val writeOptions: WriteOptions = new WriteOptions(),
+  val flushOptions: FlushOptions = new FlushOptions(),
   val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
 
   // lazy val here is important because the store directories do not exist yet, it can only be opened
@@ -190,8 +191,8 @@ class RocksDbKeyValueStore(
 
   def flush {
     metrics.flushes.inc
-    // TODO still not exposed in Java RocksDB API, follow up with rocksDB team
-    trace("Flush in RocksDbKeyValueStore is not supported, ignoring")
+    trace("Flushing.")
+    db.flush(flushOptions)
   }
 
   def close() {

http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index a428a16..0c86a5a 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -30,7 +30,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy}
 import org.apache.samza.util.Util._
 import org.junit.{Assert, Test}
-import org.rocksdb.{RocksDBException, Options}
+import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options}
 
 class TestRocksDbKeyValueStore
 {
@@ -65,4 +65,26 @@ class TestRocksDbKeyValueStore
     Assert.assertNull(rocksDB.get(key))
     rocksDB.close()
   }
+
+  @Test
+  def testFlush(): Unit = {
+    val map = new util.HashMap[String, String]()
+    val config = new MapConfig(map)
+    val flushOptions = new FlushOptions().setWaitForFlush(true)
+    val options = new Options()
+    options.setCreateIfMissing(true)
+    val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")),
+                                              options,
+                                              config,
+                                              false,
+                                              "dbStore")
+    val key = "key".getBytes("UTF-8")
+    rocksDB.put(key, "val".getBytes("UTF-8"))
+    rocksDB.flush(flushOptions)
+    val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
+    val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)
+    Assert.assertEquals(new String(rocksDBReadOnly.get(key), "UTF-8"), "val")
+    rocksDB.close()
+    rocksDBReadOnly.close()
+  }
 }