You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/16 17:48:25 UTC

git commit: SAMZA-236; add a rocksdb state implementation

Repository: incubator-samza
Updated Branches:
  refs/heads/master a66a66c53 -> 9147c343b


SAMZA-236; add a rocksdb state implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9147c343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9147c343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9147c343

Branch: refs/heads/master
Commit: 9147c343be9a85d49630bd7522c79e825f5682f2
Parents: a66a66c
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Thu Oct 16 08:48:12 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Oct 16 08:48:12 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |  18 +-
 gradle/dependency-versions.gradle               |   1 +
 .../samza/util/LexicographicComparator.scala    |  39 ++++
 .../samza/storage/kv/LevelDbKeyValueStore.scala |  17 +-
 .../RocksDbKeyValueStorageEngineFactory.scala   |  50 +++++
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 224 +++++++++++++++++++
 .../performance/TestKeyValuePerformance.scala   |   2 -
 .../samza/storage/kv/TestKeyValueStores.scala   |  85 ++++---
 settings.gradle                                 |   1 +
 9 files changed, 386 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1b37dbb..99f4354 100644
--- a/build.gradle
+++ b/build.gradle
@@ -33,7 +33,8 @@ allprojects {
     }
     mavenCentral()
     mavenLocal()
-  }
+   }
+
 }
 
 apply from: file("gradle/dependency-versions.gradle")
@@ -345,6 +346,20 @@ project(":samza-kv-leveldb_$scalaVersion") {
   }
 }
 
+project(":samza-kv-rocksdb_$scalaVersion") {
+  apply plugin: 'scala'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile project(":samza-kv_$scalaVersion")
+    compile "org.scala-lang:scala-library:$scalaLibVersion"
+    compile "org.rocksdb:rocksdbjni:$rocksdbVersion"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+  }
+}
+
 project(":samza-test_$scalaVersion") {
   apply plugin: 'scala'
 
@@ -359,6 +374,7 @@ project(":samza-test_$scalaVersion") {
     compile project(':samza-api')
     compile project(":samza-kv-inmemory_$scalaVersion")
     compile project(":samza-kv-leveldb_$scalaVersion")
+    compile project(":samza-kv-rocksdb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index fe2e446..44dd426 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -29,6 +29,7 @@
   kafkaVersion = "0.8.1.1"
   commonsHttpClientVersion = "3.1"
   leveldbVersion = "1.8"
+  rocksdbVersion = "3.5.1"
   yarnVersion = "2.4.0"
   slf4jVersion = "1.6.2"
   log4jVersion = "1.2.17"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
new file mode 100644
index 0000000..93c5220
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.Comparator
+
+/**
+ * A comparator that applies a lexicographical comparison on byte arrays.
+ */
+class LexicographicComparator extends Comparator[Array[Byte]] {
+  def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+    val l = math.min(k1.length, k2.length)
+    var i = 0
+    while (i < l) {
+      if (k1(i) != k2(i))
+        return (k1(i) & 0xff) - (k2(i) & 0xff)
+      i += 1
+    }
+    // okay prefixes are equal, the shorter array is less
+    k1.length - k2.length
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 853de12..f4a021a 100644
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -24,7 +24,7 @@ import org.fusesource.leveldbjni.JniDBFactory._
 import java.io._
 import org.apache.samza.config.Config
 import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{LexicographicComparator, Logging}
 
 object LevelDbKeyValueStore extends Logging {
 
@@ -64,7 +64,7 @@ class LevelDbKeyValueStore(
   val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
 
   private lazy val db = factory.open(dir, options)
-  private val lexicographic = new LexicographicComparator()
+  private val lexicographic = new LevelDBLexicographicComparator()
   private var deletesSinceLastCompaction = 0
 
   def get(key: Array[Byte]): Array[Byte] = {
@@ -209,18 +209,7 @@ class LevelDbKeyValueStore(
   /**
    * Compare two array lexicographically using unsigned byte arithmetic
    */
-  class LexicographicComparator extends DBComparator {
-    def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
-      val l = math.min(k1.length, k2.length)
-      var i = 0
-      while (i < l) {
-        if (k1(i) != k2(i))
-          return (k1(i) & 0xff) - (k2(i) & 0xff)
-        i += 1
-      }
-      // okay prefixes are equal, the shorter array is less
-      k1.length - k2.length
-    }
+  class LevelDBLexicographicComparator extends LexicographicComparator with DBComparator {
     def name(): String = "lexicographic"
     def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
     def findShortSuccessor(key: Array[Byte]) = key

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..a52731b
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.io.File
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.storage.kv._
+import org.apache.samza.system.SystemStreamPartition
+
+class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
+{
+  /**
+   * Return a KeyValueStore instance for the given store name
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A valid KeyValueStore instance
+   */
+  override def getKVStore(storeName: String,
+                          storeDir: File,
+                          registry: MetricsRegistry,
+                          changeLogSystemStreamPartition: SystemStreamPartition,
+                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+    val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+    val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext)
+    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics)
+    rocksDb
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
new file mode 100644
index 0000000..4b23c9b
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import org.apache.samza.util.{ LexicographicComparator, Logging }
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import org.rocksdb._
+
+object RocksDbKeyValueStore extends Logging {
+  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
+    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
+    val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
+    val options = new Options();
+
+    // Cache size and write buffer size are specified on a per-container basis.
+    val numTasks = containerContext.taskNames.size
+    options.setWriteBufferSize((writeBufSize / numTasks).toInt)
+    var cacheSizePerContainer = cacheSize / numTasks
+    options.setCompressionType(
+      storeConfig.get("rocksdb.compression", "snappy") match {
+        case "snappy" => CompressionType.SNAPPY_COMPRESSION
+        case "bzip2" => CompressionType.BZLIB2_COMPRESSION
+        case "zlib" => CompressionType.ZLIB_COMPRESSION
+        case "lz4" => CompressionType.LZ4_COMPRESSION
+        case "lz4hc" => CompressionType.LZ4HC_COMPRESSION
+        case "none" => CompressionType.NO_COMPRESSION
+        case _ =>
+          warn("Unknown rocksdb.compression codec %s, defaulting to Snappy" format storeConfig.get("rocksdb.compression", "snappy"))
+          CompressionType.SNAPPY_COMPRESSION
+      })
+
+    val blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096)
+    // We compute the cache size based on the overall container cache size, 
+    // however, if rocksdb.cache.size.bytes is overridden, then we use that.
+    cacheSizePerContainer = storeConfig.getLong("rocksdb.cache.size.bytes", cacheSizePerContainer)
+    val bloomBits = storeConfig.getInt("rocksdb.bloomfilter.bits", 10)
+    val table_options = new BlockBasedTableConfig()
+    table_options.setBlockCacheSize(cacheSizePerContainer)
+      .setBlockSize(blockSize)
+      .setFilterBitsPerKey(bloomBits)
+
+    options.setTableFormatConfig(table_options)
+    options.setCompactionStyle(
+      storeConfig.get("rocksdb.compaction.style", "universal") match {
+        case "universal" => CompactionStyle.UNIVERSAL
+        case "fifo" => CompactionStyle.FIFO
+        case "level" => CompactionStyle.LEVEL
+        case _ =>
+          warn("Unknown rocksdb.compactionStyle %s, defaulting to universal" format storeConfig.get("rocksdb.compaction.style", "universal"))
+          CompactionStyle.UNIVERSAL
+      })
+
+    options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt)
+    options.setCreateIfMissing(true)
+    options.setErrorIfExists(true)
+    options
+  }
+
+}
+
+class RocksDbKeyValueStore(
+  val dir: File,
+  val options: Options,
+  val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+  private lazy val db = RocksDB.open(options, dir.toString)
+  private val lexicographic = new LexicographicComparator()
+  private var deletesSinceLastCompaction = 0
+
+  def get(key: Array[Byte]): Array[Byte] = {
+    metrics.gets.inc
+    require(key != null, "Null key not allowed.")
+    val found = db.get(key)
+    if (found != null) {
+      metrics.bytesRead.inc(found.size)
+    }
+    found
+  }
+
+  def put(key: Array[Byte], value: Array[Byte]) {
+    metrics.puts.inc
+    require(key != null, "Null key not allowed.")
+    if (value == null) {
+      db.remove(key)
+      deletesSinceLastCompaction += 1
+    } else {
+      metrics.bytesWritten.inc(key.size + value.size)
+      db.put(key, value)
+    }
+  }
+
+  // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
+  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
+    val iter = entries.iterator
+    var wrote = 0
+    var deletes = 0
+    while (iter.hasNext) {
+      wrote += 1
+      val curr = iter.next()
+      if (curr.getValue == null) {
+        deletes += 1
+        db.remove(curr.getKey);
+      } else {
+        val key = curr.getKey
+        val value = curr.getValue
+        metrics.bytesWritten.inc(key.size + value.size)
+        db.put(key, value)
+      }
+    }
+    metrics.puts.inc(wrote)
+    metrics.deletes.inc(deletes)
+    deletesSinceLastCompaction += deletes
+  }
+
+  def delete(key: Array[Byte]) {
+    metrics.deletes.inc
+    put(key, null)
+  }
+
+  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.ranges.inc
+    require(from != null && to != null, "Null bound not allowed.")
+    new RocksDbRangeIterator(db.newIterator(), from, to)
+  }
+
+  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.alls.inc
+    val iter = db.newIterator()
+    iter.seekToFirst()
+    new RocksDbIterator(iter)
+  }
+
+  def flush {
+    metrics.flushes.inc
+    // TODO still not exposed in Java RocksDB API, follow up with rocksDB team
+    trace("Flush in RocksDbKeyValueStore is not supported, ignoring")
+  }
+
+  def close() {
+    trace("Closing.")
+    db.close()
+  }
+
+  class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
+    private var open = true
+    private var firstValueAccessed = false;
+    def close() = {
+      open = false
+      iter.dispose()
+    }
+
+    def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove");
+
+    def hasNext() = iter.isValid
+
+    // The iterator is already pointing to the next element
+    protected def peekKey() = {
+      getEntry().getKey
+    }
+
+    protected def getEntry() = {
+      val key = iter.key
+      val value = iter.value
+      new Entry(key, value)
+    }
+
+    // By virtue of how RocksdbIterator is implemented, the implementation of 
+    // our iterator is slightly different from standard java iterator next will 
+    // always point to the current element, when next is called, we return the 
+    // current element we are pointing to and advance the iterator to the next 
+    // location (The new location may or may not be valid - this will surface
+    // when the next next() call is made, the isValid will fail)
+    def next() = {
+      if (!hasNext()) {
+        throw new NoSuchElementException
+      }
+
+      val entry = getEntry()
+      iter.next
+      metrics.bytesRead.inc(entry.getKey.size)
+      if (entry.getValue != null) {
+        metrics.bytesRead.inc(entry.getValue.size)
+      }
+      entry
+    }
+
+    override def finalize() {
+      if (open) {
+        trace("Leaked reference to level db iterator, forcing close.")
+        close()
+      }
+    }
+  }
+
+  class RocksDbRangeIterator(iter: RocksIterator, from: Array[Byte], to: Array[Byte]) extends RocksDbIterator(iter) {
+    // RocksDB's JNI interface does not expose getters/setters that allow the 
+    // comparator to be pluggable, and the default is lexicographic, so it's
+    // safe to just force lexicographic comparator here for now.
+    val comparator = lexicographic
+    iter.seek(from)
+    override def hasNext() = {
+      super.hasNext() && comparator.compare(peekKey(), to) < 0
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 8fd33f1..68e9e66 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -126,9 +126,7 @@ object TestKeyValuePerformance extends Logging {
       }
 
       test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
-
       info("Cleaning up output directory for %s." format storeName)
-
       Util.rm(output)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index eefe114..2082610 100644
--- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -55,36 +55,38 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
 
   @Before
   def setup() {
-    val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
-      dir.mkdirs()
-      val leveldb = new LevelDbKeyValueStore(dir, new Options)
-      leveldb
-    } else if ("inmemory".equals(typeOfStore)) {
-      val inmemoryDb = new InMemoryKeyValueStore
-      inmemoryDb
-    } else {
-      throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
+    val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
+      case "leveldb" =>
+        dir.mkdirs ()
+        new LevelDbKeyValueStore (dir, new Options)
+      case "inmemory" =>
+        new InMemoryKeyValueStore
+      case "rocksdb" =>
+        new RocksDbKeyValueStore (dir, new org.rocksdb.Options().setCreateIfMissing(true).setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION))
+      case _ =>
+        throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
     }
 
     val passThroughSerde = new Serde[Array[Byte]] {
       def toBytes(obj: Array[Byte]) = obj
       def fromBytes(bytes: Array[Byte]) = bytes
     }
-    store = if ("cache".equals(storeConfig)) {
-      cache = true
-      new CachedStore(kvStore, CacheSize, BatchSize)
-    } else if ("serde".equals(storeConfig)) {
-      serde = true
-      new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
-    } else if ("cache-and-serde".equals(storeConfig)) {
-      val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
-      serde = true
-      cache = true
-      new CachedStore(serializedStore, CacheSize, BatchSize)
-    } else {
-      kvStore
-    }
 
+    store = storeConfig match {
+      case "cache" =>
+        cache = true
+        new CachedStore(kvStore, CacheSize, BatchSize)
+      case "serde" =>
+        serde = true
+        new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+      case "cache-and-serde" =>
+        val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+        serde = true
+        cache = true
+        new CachedStore(serializedStore, CacheSize, BatchSize)
+      case _ =>
+        kvStore
+    }
     store = new NullSafeKeyValueStore(store)
   }
 
@@ -215,14 +217,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   def testBrokenScalaDoubleLinkedList() {
     val something = b("")
     val keys = letters
-      .map(b(_))
-      .toArray
+            .map(b(_))
+            .toArray
 
     // Load the cache to capacity.
     letters
-      .slice(0, TestKeyValueStores.CacheSize)
-      .map(b(_))
-      .foreach(store.put(_, something))
+            .slice(0, TestKeyValueStores.CacheSize)
+            .map(b(_))
+            .foreach(store.put(_, something))
 
     // Now keep everything in the cache, but with an empty dirty list.
     store.flush
@@ -247,9 +249,9 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     // Get rid of 1 from the cache by reading every other element, and then 
     // putting one new element.
     letters
-      .slice(2, TestKeyValueStores.CacheSize)
-      .map(b(_))
-      .foreach(store.get(_))
+            .slice(2, TestKeyValueStores.CacheSize)
+            .map(b(_))
+            .foreach(store.get(_))
     store.put(keys(TestKeyValueStores.CacheSize), something)
 
     // Now try and trigger an NPE since the dirty list has an element (1) 
@@ -266,8 +268,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     // Make test deterministic by seeding the random number generator.
     val rand = new Random(12345)
     val keys = letters
-      .map(b(_))
-      .toArray
+            .map(b(_))
+            .toArray
 
     // Map from letter to key byte array used for letter, and expected value.
     // We have to go through some acrobatics here since Java's byte array uses 
@@ -340,5 +342,20 @@ object TestKeyValueStores {
   val CacheSize = 10
   val BatchSize = 5
   @Parameters
-  def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
+  def parameters: java.util.Collection[Array[String]] = Arrays.asList(
+      //LevelDB
+      Array("leveldb", "cache"),
+      Array("leveldb", "serde"),
+      Array("leveldb", "cache-and-serde"),
+      Array("leveldb", "none"),
+      //Inmemory
+      Array("inmemory", "cache"),
+      Array("inmemory", "serde"),
+      Array("inmemory", "cache-and-serde"),
+      Array("inmemory", "none"),
+      //RocksDB
+      Array("rocksdb","cache"),
+      Array("rocksdb","serde"),
+      Array("rocksdb","cache-and-serde"),
+      Array("rocksdb","none"))
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 325cac2..216c5ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,6 +23,7 @@ include \
   'samza-kv',
   'samza-kv-inmemory',
   'samza-kv-leveldb',
+  'samza-kv-rocksdb',
   'samza-log4j',
   'samza-serializers',
   'samza-shell',