You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:46 UTC
[30/50] [abbrv] samza git commit: SAMZA-819
RocksDbKeyValueStore.flush() should be implemented
SAMZA-819 RocksDbKeyValueStore.flush() should be implemented
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c84a0b54
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c84a0b54
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c84a0b54
Branch: refs/heads/samza-sql
Commit: c84a0b54d634de5a4c996ae3ce9742d98ae23342
Parents: 429f245
Author: Tao Feng <fe...@gmail.com>
Authored: Fri Nov 20 14:43:36 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Fri Nov 20 14:43:36 2015 -0800
----------------------------------------------------------------------
.../RocksDbKeyValueStorageEngineFactory.scala | 5 ++--
.../samza/storage/kv/RocksDbKeyValueStore.scala | 5 ++--
.../storage/kv/TestRocksDbKeyValueStore.scala | 24 +++++++++++++++++++-
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index b949793..dae6e35 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -24,7 +24,7 @@ import org.apache.samza.container.SamzaContainerContext
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.storage.kv._
import org.apache.samza.system.SystemStreamPartition
-import org.rocksdb.WriteOptions
+import org.rocksdb.{FlushOptions, WriteOptions}
import org.apache.samza.config.StorageConfig._
class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
@@ -48,7 +48,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
- val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbMetrics)
+ val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
+ val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbFlushOptions, rocksDbMetrics)
rocksDb
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 4620037..211fc3b 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -95,6 +95,7 @@ class RocksDbKeyValueStore(
val isLoggedStore: Boolean,
val storeName: String,
val writeOptions: WriteOptions = new WriteOptions(),
+ val flushOptions: FlushOptions = new FlushOptions(),
val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
// lazy val here is important because the store directories do not exist yet, it can only be opened
@@ -190,8 +191,8 @@ class RocksDbKeyValueStore(
def flush {
metrics.flushes.inc
- // TODO still not exposed in Java RocksDB API, follow up with rocksDB team
- trace("Flush in RocksDbKeyValueStore is not supported, ignoring")
+ trace("Flushing.")
+ db.flush(flushOptions)
}
def close() {
http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index a428a16..0c86a5a 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -30,7 +30,7 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy}
import org.apache.samza.util.Util._
import org.junit.{Assert, Test}
-import org.rocksdb.{RocksDBException, Options}
+import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options}
class TestRocksDbKeyValueStore
{
@@ -65,4 +65,26 @@ class TestRocksDbKeyValueStore
Assert.assertNull(rocksDB.get(key))
rocksDB.close()
}
+
+ @Test
+ def testFlush(): Unit = {
+ val map = new util.HashMap[String, String]()
+ val config = new MapConfig(map)
+ val flushOptions = new FlushOptions().setWaitForFlush(true)
+ val options = new Options()
+ options.setCreateIfMissing(true)
+ val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")),
+ options,
+ config,
+ false,
+ "dbStore")
+ val key = "key".getBytes("UTF-8")
+ rocksDB.put(key, "val".getBytes("UTF-8"))
+ rocksDB.flush(flushOptions)
+ val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
+ val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)
+ Assert.assertEquals(new String(rocksDBReadOnly.get(key), "UTF-8"), "val")
+ rocksDB.close()
+ rocksDBReadOnly.close()
+ }
}