You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/16 17:48:25 UTC
git commit: SAMZA-236; add a rocksdb state implementation
Repository: incubator-samza
Updated Branches:
refs/heads/master a66a66c53 -> 9147c343b
SAMZA-236; add a rocksdb state implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9147c343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9147c343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9147c343
Branch: refs/heads/master
Commit: 9147c343be9a85d49630bd7522c79e825f5682f2
Parents: a66a66c
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Thu Oct 16 08:48:12 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Oct 16 08:48:12 2014 -0700
----------------------------------------------------------------------
build.gradle | 18 +-
gradle/dependency-versions.gradle | 1 +
.../samza/util/LexicographicComparator.scala | 39 ++++
.../samza/storage/kv/LevelDbKeyValueStore.scala | 17 +-
.../RocksDbKeyValueStorageEngineFactory.scala | 50 +++++
.../samza/storage/kv/RocksDbKeyValueStore.scala | 224 +++++++++++++++++++
.../performance/TestKeyValuePerformance.scala | 2 -
.../samza/storage/kv/TestKeyValueStores.scala | 85 ++++---
settings.gradle | 1 +
9 files changed, 386 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1b37dbb..99f4354 100644
--- a/build.gradle
+++ b/build.gradle
@@ -33,7 +33,8 @@ allprojects {
}
mavenCentral()
mavenLocal()
- }
+ }
+
}
apply from: file("gradle/dependency-versions.gradle")
@@ -345,6 +346,20 @@ project(":samza-kv-leveldb_$scalaVersion") {
}
}
+project(":samza-kv-rocksdb_$scalaVersion") {
+ apply plugin: 'scala'
+
+ dependencies {
+ compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
+ compile project(":samza-kv_$scalaVersion")
+ compile "org.scala-lang:scala-library:$scalaLibVersion"
+ compile "org.rocksdb:rocksdbjni:$rocksdbVersion"
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+ }
+}
+
project(":samza-test_$scalaVersion") {
apply plugin: 'scala'
@@ -359,6 +374,7 @@ project(":samza-test_$scalaVersion") {
compile project(':samza-api')
compile project(":samza-kv-inmemory_$scalaVersion")
compile project(":samza-kv-leveldb_$scalaVersion")
+ compile project(":samza-kv-rocksdb_$scalaVersion")
compile project(":samza-core_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index fe2e446..44dd426 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -29,6 +29,7 @@
kafkaVersion = "0.8.1.1"
commonsHttpClientVersion = "3.1"
leveldbVersion = "1.8"
+ rocksdbVersion = "3.5.1"
yarnVersion = "2.4.0"
slf4jVersion = "1.6.2"
log4jVersion = "1.2.17"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
new file mode 100644
index 0000000..93c5220
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.Comparator
+
+/**
+ * A comparator that applies a lexicographical comparison on byte arrays.
+ */
+class LexicographicComparator extends Comparator[Array[Byte]] {
+ def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+ val l = math.min(k1.length, k2.length)
+ var i = 0
+ while (i < l) {
+ if (k1(i) != k2(i))
+ return (k1(i) & 0xff) - (k2(i) & 0xff)
+ i += 1
+ }
+ // okay prefixes are equal, the shorter array is less
+ k1.length - k2.length
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 853de12..f4a021a 100644
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -24,7 +24,7 @@ import org.fusesource.leveldbjni.JniDBFactory._
import java.io._
import org.apache.samza.config.Config
import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{LexicographicComparator, Logging}
object LevelDbKeyValueStore extends Logging {
@@ -64,7 +64,7 @@ class LevelDbKeyValueStore(
val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
private lazy val db = factory.open(dir, options)
- private val lexicographic = new LexicographicComparator()
+ private val lexicographic = new LevelDBLexicographicComparator()
private var deletesSinceLastCompaction = 0
def get(key: Array[Byte]): Array[Byte] = {
@@ -209,18 +209,7 @@ class LevelDbKeyValueStore(
/**
* Compare two array lexicographically using unsigned byte arithmetic
*/
- class LexicographicComparator extends DBComparator {
- def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
- val l = math.min(k1.length, k2.length)
- var i = 0
- while (i < l) {
- if (k1(i) != k2(i))
- return (k1(i) & 0xff) - (k2(i) & 0xff)
- i += 1
- }
- // okay prefixes are equal, the shorter array is less
- k1.length - k2.length
- }
+ class LevelDBLexicographicComparator extends LexicographicComparator with DBComparator {
def name(): String = "lexicographic"
def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
def findShortSuccessor(key: Array[Byte]) = key
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..a52731b
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.io.File
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.storage.kv._
+import org.apache.samza.system.SystemStreamPartition
+
+class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
+{
+ /**
+ * Return a KeyValueStore instance for the given store name
+ * @param storeName Name of the store
+ * @param storeDir The directory of the store
+ * @param registry MetricsRegistry to which to publish store specific metrics.
+ * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+ * @param containerContext Information about the container in which the task is executing.
+ * @return A valid KeyValueStore instance
+ */
+ override def getKVStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+ val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+ val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext)
+ val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics)
+ rocksDb
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
new file mode 100644
index 0000000..4b23c9b
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import org.apache.samza.util.{ LexicographicComparator, Logging }
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import org.rocksdb._
+
+object RocksDbKeyValueStore extends Logging {
+ def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
+ val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
+ val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
+ val options = new Options();
+
+ // Cache size and write buffer size are specified on a per-container basis.
+ val numTasks = containerContext.taskNames.size
+ options.setWriteBufferSize((writeBufSize / numTasks).toInt)
+ var cacheSizePerContainer = cacheSize / numTasks
+ options.setCompressionType(
+ storeConfig.get("rocksdb.compression", "snappy") match {
+ case "snappy" => CompressionType.SNAPPY_COMPRESSION
+ case "bzip2" => CompressionType.BZLIB2_COMPRESSION
+ case "zlib" => CompressionType.ZLIB_COMPRESSION
+ case "lz4" => CompressionType.LZ4_COMPRESSION
+ case "lz4hc" => CompressionType.LZ4HC_COMPRESSION
+ case "none" => CompressionType.NO_COMPRESSION
+ case _ =>
+ warn("Unknown rocksdb.compression codec %s, defaulting to Snappy" format storeConfig.get("rocksdb.compression", "snappy"))
+ CompressionType.SNAPPY_COMPRESSION
+ })
+
+ val blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096)
+ // We compute the cache size based on the overall container cache size,
+ // however, if rocksdb.cache.size.bytes is overridden, then we use that.
+ cacheSizePerContainer = storeConfig.getLong("rocksdb.cache.size.bytes", cacheSizePerContainer)
+ val bloomBits = storeConfig.getInt("rocksdb.bloomfilter.bits", 10)
+ val table_options = new BlockBasedTableConfig()
+ table_options.setBlockCacheSize(cacheSizePerContainer)
+ .setBlockSize(blockSize)
+ .setFilterBitsPerKey(bloomBits)
+
+ options.setTableFormatConfig(table_options)
+ options.setCompactionStyle(
+ storeConfig.get("rocksdb.compaction.style", "universal") match {
+ case "universal" => CompactionStyle.UNIVERSAL
+ case "fifo" => CompactionStyle.FIFO
+ case "level" => CompactionStyle.LEVEL
+ case _ =>
+ warn("Unknown rocksdb.compactionStyle %s, defaulting to universal" format storeConfig.get("rocksdb.compaction.style", "universal"))
+ CompactionStyle.UNIVERSAL
+ })
+
+ options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt)
+ options.setCreateIfMissing(true)
+ options.setErrorIfExists(true)
+ options
+ }
+
+}
+
+class RocksDbKeyValueStore(
+ val dir: File,
+ val options: Options,
+ val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+ private lazy val db = RocksDB.open(options, dir.toString)
+ private val lexicographic = new LexicographicComparator()
+ private var deletesSinceLastCompaction = 0
+
+ def get(key: Array[Byte]): Array[Byte] = {
+ metrics.gets.inc
+ require(key != null, "Null key not allowed.")
+ val found = db.get(key)
+ if (found != null) {
+ metrics.bytesRead.inc(found.size)
+ }
+ found
+ }
+
+ def put(key: Array[Byte], value: Array[Byte]) {
+ metrics.puts.inc
+ require(key != null, "Null key not allowed.")
+ if (value == null) {
+ db.remove(key)
+ deletesSinceLastCompaction += 1
+ } else {
+ metrics.bytesWritten.inc(key.size + value.size)
+ db.put(key, value)
+ }
+ }
+
+ // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
+ def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
+ val iter = entries.iterator
+ var wrote = 0
+ var deletes = 0
+ while (iter.hasNext) {
+ wrote += 1
+ val curr = iter.next()
+ if (curr.getValue == null) {
+ deletes += 1
+ db.remove(curr.getKey);
+ } else {
+ val key = curr.getKey
+ val value = curr.getValue
+ metrics.bytesWritten.inc(key.size + value.size)
+ db.put(key, value)
+ }
+ }
+ metrics.puts.inc(wrote)
+ metrics.deletes.inc(deletes)
+ deletesSinceLastCompaction += deletes
+ }
+
+ def delete(key: Array[Byte]) {
+ metrics.deletes.inc
+ put(key, null)
+ }
+
+ def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.ranges.inc
+ require(from != null && to != null, "Null bound not allowed.")
+ new RocksDbRangeIterator(db.newIterator(), from, to)
+ }
+
+ def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.alls.inc
+ val iter = db.newIterator()
+ iter.seekToFirst()
+ new RocksDbIterator(iter)
+ }
+
+ def flush {
+ metrics.flushes.inc
+ // TODO still not exposed in Java RocksDB API, follow up with rocksDB team
+ trace("Flush in RocksDbKeyValueStore is not supported, ignoring")
+ }
+
+ def close() {
+ trace("Closing.")
+ db.close()
+ }
+
+ class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
+ private var open = true
+ private var firstValueAccessed = false;
+ def close() = {
+ open = false
+ iter.dispose()
+ }
+
+ def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove");
+
+ def hasNext() = iter.isValid
+
+ // The iterator is already pointing to the next element
+ protected def peekKey() = {
+ getEntry().getKey
+ }
+
+ protected def getEntry() = {
+ val key = iter.key
+ val value = iter.value
+ new Entry(key, value)
+ }
+
+ // By virtue of how RocksdbIterator is implemented, the implementation of
+ // our iterator is slightly different from standard java iterator next will
+ // always point to the current element, when next is called, we return the
+ // current element we are pointing to and advance the iterator to the next
+ // location (The new location may or may not be valid - this will surface
+ // when the next next() call is made, the isValid will fail)
+ def next() = {
+ if (!hasNext()) {
+ throw new NoSuchElementException
+ }
+
+ val entry = getEntry()
+ iter.next
+ metrics.bytesRead.inc(entry.getKey.size)
+ if (entry.getValue != null) {
+ metrics.bytesRead.inc(entry.getValue.size)
+ }
+ entry
+ }
+
+ override def finalize() {
+ if (open) {
+ trace("Leaked reference to level db iterator, forcing close.")
+ close()
+ }
+ }
+ }
+
+ class RocksDbRangeIterator(iter: RocksIterator, from: Array[Byte], to: Array[Byte]) extends RocksDbIterator(iter) {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ val comparator = lexicographic
+ iter.seek(from)
+ override def hasNext() = {
+ super.hasNext() && comparator.compare(peekKey(), to) < 0
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 8fd33f1..68e9e66 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -126,9 +126,7 @@ object TestKeyValuePerformance extends Logging {
}
test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
-
info("Cleaning up output directory for %s." format storeName)
-
Util.rm(output)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index eefe114..2082610 100644
--- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -55,36 +55,38 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
@Before
def setup() {
- val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
- dir.mkdirs()
- val leveldb = new LevelDbKeyValueStore(dir, new Options)
- leveldb
- } else if ("inmemory".equals(typeOfStore)) {
- val inmemoryDb = new InMemoryKeyValueStore
- inmemoryDb
- } else {
- throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
+ val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
+ case "leveldb" =>
+ dir.mkdirs ()
+ new LevelDbKeyValueStore (dir, new Options)
+ case "inmemory" =>
+ new InMemoryKeyValueStore
+ case "rocksdb" =>
+ new RocksDbKeyValueStore (dir, new org.rocksdb.Options().setCreateIfMissing(true).setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION))
+ case _ =>
+ throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
}
val passThroughSerde = new Serde[Array[Byte]] {
def toBytes(obj: Array[Byte]) = obj
def fromBytes(bytes: Array[Byte]) = bytes
}
- store = if ("cache".equals(storeConfig)) {
- cache = true
- new CachedStore(kvStore, CacheSize, BatchSize)
- } else if ("serde".equals(storeConfig)) {
- serde = true
- new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
- } else if ("cache-and-serde".equals(storeConfig)) {
- val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
- serde = true
- cache = true
- new CachedStore(serializedStore, CacheSize, BatchSize)
- } else {
- kvStore
- }
+ store = storeConfig match {
+ case "cache" =>
+ cache = true
+ new CachedStore(kvStore, CacheSize, BatchSize)
+ case "serde" =>
+ serde = true
+ new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ case "cache-and-serde" =>
+ val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ serde = true
+ cache = true
+ new CachedStore(serializedStore, CacheSize, BatchSize)
+ case _ =>
+ kvStore
+ }
store = new NullSafeKeyValueStore(store)
}
@@ -215,14 +217,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
def testBrokenScalaDoubleLinkedList() {
val something = b("")
val keys = letters
- .map(b(_))
- .toArray
+ .map(b(_))
+ .toArray
// Load the cache to capacity.
letters
- .slice(0, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.put(_, something))
+ .slice(0, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.put(_, something))
// Now keep everything in the cache, but with an empty dirty list.
store.flush
@@ -247,9 +249,9 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
// Get rid of 1 from the cache by reading every other element, and then
// putting one new element.
letters
- .slice(2, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.get(_))
+ .slice(2, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.get(_))
store.put(keys(TestKeyValueStores.CacheSize), something)
// Now try and trigger an NPE since the dirty list has an element (1)
@@ -266,8 +268,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
// Make test deterministic by seeding the random number generator.
val rand = new Random(12345)
val keys = letters
- .map(b(_))
- .toArray
+ .map(b(_))
+ .toArray
// Map from letter to key byte array used for letter, and expected value.
// We have to go through some acrobatics here since Java's byte array uses
@@ -340,5 +342,20 @@ object TestKeyValueStores {
val CacheSize = 10
val BatchSize = 5
@Parameters
- def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
+ def parameters: java.util.Collection[Array[String]] = Arrays.asList(
+ //LevelDB
+ Array("leveldb", "cache"),
+ Array("leveldb", "serde"),
+ Array("leveldb", "cache-and-serde"),
+ Array("leveldb", "none"),
+ //Inmemory
+ Array("inmemory", "cache"),
+ Array("inmemory", "serde"),
+ Array("inmemory", "cache-and-serde"),
+ Array("inmemory", "none"),
+ //RocksDB
+ Array("rocksdb","cache"),
+ Array("rocksdb","serde"),
+ Array("rocksdb","cache-and-serde"),
+ Array("rocksdb","none"))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 325cac2..216c5ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,6 +23,7 @@ include \
'samza-kv',
'samza-kv-inmemory',
'samza-kv-leveldb',
+ 'samza-kv-rocksdb',
'samza-log4j',
'samza-serializers',
'samza-shell',