You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/10 23:09:44 UTC

git commit: SAMZA-256; write an in-memory store

Repository: incubator-samza
Updated Branches:
  refs/heads/master 8edb3fed7 -> 46bc23b92


SAMZA-256; write an in-memory store


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/46bc23b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/46bc23b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/46bc23b9

Branch: refs/heads/master
Commit: 46bc23b926419751c546d1f59c1c241b56d5cbce
Parents: 8edb3fe
Author: Chinmay Soman <ch...@gmail.com>
Authored: Thu Jul 10 14:07:52 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 10 14:07:52 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |  30 +-
 gradle/dependency-versions.gradle               |   1 +
 .../InMemoryKeyValueStorageEngineFactory.scala  |  41 +++
 .../kv/inmemory/InMemoryKeyValueStore.scala     | 115 +++++++
 .../kv/KeyValueStorageEngineFactory.scala       |  42 +++
 .../LevelDbKeyValueStorageEngineFactory.scala   |  61 ++++
 .../samza/storage/kv/LevelDbKeyValueStore.scala | 234 +++++++++++++
 .../samza/storage/kv/TestKeyValueStores.scala   | 344 +++++++++++++++++++
 .../kv/BaseKeyValueStorageEngineFactory.scala   | 118 +++++++
 .../apache/samza/storage/kv/CachedStore.scala   |   3 +-
 .../samza/storage/kv/CachedStoreMetrics.scala   |   3 +-
 .../storage/kv/KeyValueStorageEngine.scala      |  10 +-
 .../kv/KeyValueStorageEngineFactory.scala       |  86 -----
 .../kv/KeyValueStorageEngineMetrics.scala       |   3 +-
 .../samza/storage/kv/KeyValueStoreMetrics.scala |  38 ++
 .../samza/storage/kv/LevelDbKeyValueStore.scala | 234 -------------
 .../kv/LevelDbKeyValueStoreMetrics.scala        |  41 ---
 .../apache/samza/storage/kv/LoggedStore.scala   |   7 +-
 .../samza/storage/kv/LoggedStoreMetrics.scala   |   3 +-
 .../storage/kv/NullSafeKeyValueStore.scala      |   1 +
 .../storage/kv/SerializedKeyValueStore.scala    |   3 +-
 .../kv/SerializedKeyValueStoreMetrics.scala     |   3 +-
 .../samza/storage/kv/TestKeyValueStores.scala   | 316 -----------------
 .../performance/TestKeyValuePerformance.scala   |   2 -
 settings.gradle                                 |   2 +-
 25 files changed, 1037 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c853fc0..6d0ac97 100644
--- a/build.gradle
+++ b/build.gradle
@@ -291,6 +291,34 @@ project(":samza-kv_$scalaVersion") {
     compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
+    testCompile "junit:junit:$junitVersion"
+  }
+}
+
+project(":samza-kv-inmemory_$scalaVersion") {
+  apply plugin: 'scala'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile project(":samza-kv_$scalaVersion")
+    compile "org.scala-lang:scala-library:$scalaLibVersion"
+    compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
+    compile "com.google.guava:guava:$guavaVersion"
+    testCompile "junit:junit:$junitVersion"
+  }
+}
+
+project(":samza-kv-leveldb_$scalaVersion") {
+  apply plugin: 'scala'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile project(":samza-kv_$scalaVersion")
+    compile project(":samza-kv-inmemory_$scalaVersion")
+    compile "org.scala-lang:scala-library:$scalaLibVersion"
+    compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
     compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
@@ -309,7 +337,7 @@ project(":samza-test_$scalaVersion") {
 
   dependencies {
     compile project(':samza-api')
-    compile project(":samza-kv_$scalaVersion")
+    compile project(":samza-kv-leveldb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 0787258..7373582 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -31,4 +31,5 @@
   leveldbVersion = "1.8"
   yarnVersion = "2.4.0"
   slf4jVersion = "1.6.2"
+  guavaVersion = "17.0"
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..53147ad
--- /dev/null
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory
+
+import java.io.File
+
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.storage.kv.{KeyValueStoreMetrics, BaseKeyValueStorageEngineFactory, KeyValueStore}
+import org.apache.samza.system.SystemStreamPartition
+
+class InMemoryKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+  override def getKVStore(storeName: String,
+                          storeDir: File,
+                          registry: MetricsRegistry,
+                          changeLogSystemStreamPartition: SystemStreamPartition,
+                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    val metrics = new KeyValueStoreMetrics(storeName, registry)
+    val inMemoryDb = new InMemoryKeyValueStore (metrics)
+    inMemoryDb
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
new file mode 100644
index 0000000..8e1493a
--- /dev/null
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory
+
+import com.google.common.primitives.UnsignedBytes
+import grizzled.slf4j.Logging
+import org.apache.samza.storage.kv.{KeyValueStoreMetrics, KeyValueIterator, Entry, KeyValueStore}
+import java.util
+
+/**
+ * In memory implementation of a key value store.
+ *
+ * This uses a TreeMap to store the keys in order
+ *
+ * @param metrics A metrics instance to publish key-value store related statistics
+ */
+class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics)
+  extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+  val underlying = new util.TreeMap[Array[Byte], Array[Byte]] (UnsignedBytes.lexicographicalComparator())
+
+  override def flush(): Unit = {
+    // No-op for In memory store.
+    metrics.flushes.inc
+  }
+
+  override def close(): Unit = Unit
+
+  private def getIter(tm:util.SortedMap[Array[Byte], Array[Byte]]) = {
+    new KeyValueIterator[Array[Byte], Array[Byte]] {
+      val iter = tm.entrySet().iterator()
+
+      override def close(): Unit = Unit
+
+      override def remove(): Unit = iter.remove()
+
+      override def next(): Entry[Array[Byte], Array[Byte]] = {
+        val n = iter.next()
+        if (n != null && n.getKey != null) {
+          metrics.bytesRead.inc(n.getKey.size)
+        }
+        if (n != null && n.getValue != null) {
+          metrics.bytesRead.inc(n.getValue.size)
+        }
+        new Entry(n.getKey, n.getValue)
+      }
+
+      override def hasNext: Boolean = iter.hasNext
+    }
+  }
+  override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.alls.inc
+    getIter(underlying)
+  }
+
+  override def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.ranges.inc
+    require(from != null && to != null, "Null bound not allowed.")
+    getIter(underlying.subMap(from, to))
+  }
+
+  override def delete(key: Array[Byte]): Unit = {
+    metrics.deletes.inc
+    put(key, null)
+  }
+
+  override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
+    // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
+    // to use it, in order to putAll here.  Therefore, just iterate here.
+    val iter = entries.iterator()
+    while(iter.hasNext) {
+      val next = iter.next()
+      put(next.getKey, next.getValue)
+    }
+  }
+
+  override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    metrics.puts.inc
+    require(key != null, "Null key not allowed.")
+    if (value == null) {
+      metrics.deletes.inc
+      underlying.remove(key)
+    } else {
+      metrics.bytesWritten.inc(key.size + value.size)
+      underlying.put(key, value)
+    }
+  }
+
+  override def get(key: Array[Byte]): Array[Byte] = {
+    metrics.gets.inc
+    require(key != null, "Null key not allowed.")
+    val found = underlying.get(key)
+    if (found != null) {
+      metrics.bytesRead.inc(found.size)
+    }
+    found
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..0ba4f8a
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemStreamPartition
+
+/**
+ * A backwards compatible factory that points to LevelDb. This exists for all the old Samza jobs
+ * that still refer to KeyValueStorageEngineFactory.
+ */
+class KeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+  override def getKVStore(storeName: String,
+                          storeDir: File,
+                          registry: MetricsRegistry,
+                          changeLogSystemStreamPartition: SystemStreamPartition,
+                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..9642823
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.storage.StorageEngine
+
+object LevelDbKeyValueStorageEngineFactory {
+  def getKeyValueStore(storeName: String,
+                          storeDir: File,
+                          registry: MetricsRegistry,
+                          changeLogSystemStreamPartition: SystemStreamPartition,
+                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+    val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
+
+    val levelDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+    val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
+    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
+
+    levelDb
+  }
+
+}
+
+class LevelDbKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+  override def getKVStore(storeName: String,
+                          storeDir: File,
+                          registry: MetricsRegistry,
+                          changeLogSystemStreamPartition: SystemStreamPartition,
+                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
new file mode 100644
index 0000000..751fe4c
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.nio.ByteBuffer
+import org.iq80.leveldb._
+import org.fusesource.leveldbjni.internal.NativeComparator
+import org.fusesource.leveldbjni.JniDBFactory._
+import java.io._
+import java.util.Iterator
+import java.lang.Iterable
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import grizzled.slf4j.{ Logger, Logging }
+
+object LevelDbKeyValueStore {
+  private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
+
+  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
+    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
+    val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
+    val options = new Options
+
+    // Cache size and write buffer size are specified on a per-container basis.
+    options.cacheSize(cacheSize / containerContext.partitions.size)
+    options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
+    options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
+    options.compressionType(
+      storeConfig.get("leveldb.compression", "snappy") match {
+        case "snappy" => CompressionType.SNAPPY
+        case "none" => CompressionType.NONE
+        case _ =>
+          logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
+          CompressionType.SNAPPY
+      })
+    options.createIfMissing(true)
+    options.errorIfExists(true)
+    options
+  }
+}
+
+class LevelDbKeyValueStore(
+  val dir: File,
+  val options: Options,
+
+  /**
+   * How many deletes must occur before we will force a compaction. This is to
+   * get around performance issues discovered in SAMZA-254. A value of -1 
+   * disables this feature.
+   */
+  val deleteCompactionThreshold: Int = -1,
+  val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+  private lazy val db = factory.open(dir, options)
+  private val lexicographic = new LexicographicComparator()
+  private var deletesSinceLastCompaction = 0
+
+  def get(key: Array[Byte]): Array[Byte] = {
+    maybeCompact
+    metrics.gets.inc
+    require(key != null, "Null key not allowed.")
+    val found = db.get(key)
+    if (found != null) {
+      metrics.bytesRead.inc(found.size)
+    }
+    found
+  }
+
+  def put(key: Array[Byte], value: Array[Byte]) {
+    metrics.puts.inc
+    require(key != null, "Null key not allowed.")
+    if (value == null) {
+      db.delete(key)
+      deletesSinceLastCompaction += 1
+    } else {
+      metrics.bytesWritten.inc(key.size + value.size)
+      db.put(key, value)
+    }
+  }
+
+  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
+    val batch = db.createWriteBatch()
+    val iter = entries.iterator
+    var wrote = 0
+    var deletes = 0
+    while (iter.hasNext) {
+      wrote += 1
+      val curr = iter.next()
+      if (curr.getValue == null) {
+        deletes += 1
+        batch.delete(curr.getKey)
+      } else {
+        val key = curr.getKey
+        val value = curr.getValue
+        metrics.bytesWritten.inc(key.size + value.size)
+        batch.put(key, value)
+      }
+    }
+    db.write(batch)
+    batch.close
+    metrics.puts.inc(wrote)
+    metrics.deletes.inc(deletes)
+    deletesSinceLastCompaction += deletes
+  }
+
+  def delete(key: Array[Byte]) {
+    metrics.deletes.inc
+    put(key, null)
+  }
+
+  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    maybeCompact
+    metrics.ranges.inc
+    require(from != null && to != null, "Null bound not allowed.")
+    new LevelDbRangeIterator(db.iterator, from, to)
+  }
+
+  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    maybeCompact
+    metrics.alls.inc
+    val iter = db.iterator()
+    iter.seekToFirst()
+    new LevelDbIterator(iter)
+  }
+
+  /**
+   * Trigger a complete compaction on the LevelDB store if there have been at
+   * least deleteCompactionThreshold deletes since the last compaction.
+   */
+  def maybeCompact = {
+    if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
+      compact
+    }
+  }
+
+  /**
+   * Trigger a complete compaction of the LevelDB store.
+   */
+  def compact {
+    // According to LevelDB's docs:
+    // begin==NULL is treated as a key before all keys in the database.
+    // end==NULL is treated as a key after all keys in the database.
+    db.compactRange(null, null)
+    deletesSinceLastCompaction = 0
+  }
+
+  def flush {
+    metrics.flushes.inc
+    // TODO can't find a flush for leveldb
+    trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
+  }
+
+  def close() {
+    trace("Closing.")
+
+    db.close()
+  }
+
+  class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
+    private var open = true
+    def close() = {
+      open = false
+      iter.close()
+    }
+    def remove() = iter.remove()
+    def hasNext() = iter.hasNext()
+    def next() = {
+      if (!hasNext()) {
+        throw new NoSuchElementException
+      }
+
+      val curr = iter.next
+      val key = curr.getKey
+      val value = curr.getValue
+      metrics.bytesRead.inc(key.size)
+      if (value != null) {
+        metrics.bytesRead.inc(value.size)
+      }
+      new Entry(key, value)
+    }
+    override def finalize() {
+      if (open) {
+        System.err.println("Leaked reference to level db iterator, forcing close.")
+        close()
+      }
+    }
+  }
+
+  class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
+    val comparator = if (options.comparator == null) lexicographic else options.comparator
+    iter.seek(from)
+    override def hasNext() = {
+      iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) < 0
+    }
+  }
+
+  /**
+   * Compare two array lexicographically using unsigned byte arithmetic
+   */
+  class LexicographicComparator extends DBComparator {
+    def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+      val l = math.min(k1.length, k2.length)
+      var i = 0
+      while (i < l) {
+        if (k1(i) != k2(i))
+          return (k1(i) & 0xff) - (k2(i) & 0xff)
+        i += 1
+      }
+      // okay prefixes are equal, the shorter array is less
+      k1.length - k2.length
+    }
+    def name(): String = "lexicographic"
+    def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
+    def findShortSuccessor(key: Array[Byte]) = key
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
new file mode 100644
index 0000000..d6d437a
--- /dev/null
+++ b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import java.util.Arrays
+import java.util.Random
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
+
+import scala.collection.JavaConversions._
+import org.iq80.leveldb.Options
+import org.junit.After
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.apache.samza.serializers.Serde
+import org.scalatest.Assertions.intercept
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Test suite to check different key value store operations
+ * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory")
+ * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store
+ */
+@RunWith(value = classOf[Parameterized])
+class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
+  import TestKeyValueStores._
+
+  val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
+  val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
+  var store: KeyValueStore[Array[Byte], Array[Byte]] = null
+  var cache = false
+  var serde = false
+
+  @Before
+  def setup() {
+    val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
+      dir.mkdirs()
+      val leveldb = new LevelDbKeyValueStore(dir, new Options)
+      leveldb
+    } else if ("inmemory".equals(typeOfStore)) {
+      val inmemoryDb = new InMemoryKeyValueStore
+      inmemoryDb
+    } else {
+      throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
+    }
+
+    val passThroughSerde = new Serde[Array[Byte]] {
+      def toBytes(obj: Array[Byte]) = obj
+      def fromBytes(bytes: Array[Byte]) = bytes
+    }
+    store = if ("cache".equals(storeConfig)) {
+      cache = true
+      new CachedStore(kvStore, CacheSize, BatchSize)
+    } else if ("serde".equals(storeConfig)) {
+      serde = true
+      new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+    } else if ("cache-and-serde".equals(storeConfig)) {
+      val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+      serde = true
+      cache = true
+      new CachedStore(serializedStore, CacheSize, BatchSize)
+    } else {
+      kvStore
+    }
+
+    store = new NullSafeKeyValueStore(store)
+  }
+
+  @After
+  def teardown() {
+    store.close
+    if (dir != null && dir.listFiles() != null) {
+      for (file <- dir.listFiles)
+        file.delete()
+      dir.delete()
+    }
+  }
+
+  @Test
+  def getNonExistantIsNull() {
+    assertNull(store.get(b("hello")))
+  }
+
+  @Test
+  def putAndGet() {
+    store.put(b("k"), b("v"))
+    assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
+  }
+
+  @Test
+  def doublePutAndGet() {
+    val k = b("k2")
+    store.put(k, b("v1"))
+    store.put(k, b("v2"))
+    store.put(k, b("v3"))
+    assertTrue(Arrays.equals(b("v3"), store.get(k)))
+  }
+
+  @Test
+  def testNullsWithSerde() {
+    if (serde) {
+      val a = b("a")
+      val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
+      val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
+
+      intercept[NullPointerException] { store.get(null) }
+      intercept[NullPointerException] { store.delete(null) }
+      intercept[NullPointerException] { store.put(null, a) }
+      intercept[NullPointerException] { store.put(a, null) }
+      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
+      intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
+      intercept[NullPointerException] { store.range(a, null) }
+      intercept[NullPointerException] { store.range(null, a) }
+    }
+  }
+
+  @Test
+  def testPutAll() {
+    // Use CacheSize - 1 so we fully fill the cache, but don't write any data 
+    // out. Our check (below) uses == for cached entries, and using 
+    // numEntires >= CacheSize would result in the LRU cache dropping some 
+    // entries. The result would be that we get the correct byte array back 
+    // from the cache's underlying store (leveldb), but that == would fail.
+    val numEntries = CacheSize - 1
+    val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
+    store.putAll(entries)
+    if (cache) {
+      assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
+    } else {
+      assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
+    }
+  }
+
+  @Test
+  def testIterateAll() {
+    for (letter <- letters)
+      store.put(b(letter.toString), b(letter.toString))
+    val iter = store.all
+    checkRange(letters, iter)
+    iter.close()
+  }
+
+  @Test
+  def testRange() {
+    val from = 5
+    val to = 20
+    for (letter <- letters)
+      store.put(b(letter.toString), b(letter.toString))
+
+    val iter = store.range(b(letters(from)), b(letters(to)))
+    checkRange(letters.slice(from, to), iter)
+    iter.close()
+  }
+
+  @Test
+  def testDelete() {
+    val a = b("a")
+    assertNull(store.get(a))
+    store.put(a, a)
+    assertTrue(Arrays.equals(a, store.get(a)))
+    store.delete(a)
+    assertNull(store.get(a))
+  }
+
+  @Test
+  def testSimpleScenario() {
+    val vals = letters.map(b(_))
+    for (v <- vals) {
+      assertNull(store.get(v))
+      store.put(v, v)
+      assertTrue(Arrays.equals(v, store.get(v)))
+    }
+    vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
+    vals.foreach(v => store.delete(v))
+    vals.foreach(v => assertNull(store.get(v)))
+  }
+
+  /**
+   * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
+   * implementation. The issue is that it doesn't work. More specifically,
+   * creating a DoubleLinkedList from an existing list does not update the
+   * "prev" field of the existing list's head to point to the new head. As a
+   * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
+   * Samza gets around this by manually updating the field itself. See SAMZA-80
+   * for details.
+   *
+   * This issue is exposed in Samza's KV cache implementation, which uses
+   * DoubleLinkedList, so all comments in this method are discussing the cached
+   * implementation, but the test is still useful as a sanity check for
+   * non-cached stores.
+   */
+  @Test
+  def testBrokenScalaDoubleLinkedList() {
+    val something = b("")
+    val keys = letters
+      .map(b(_))
+      .toArray
+
+    // Load the cache to capacity.
+    letters
+      .slice(0, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.put(_, something))
+
+    // Now keep everything in the cache, but with an empty dirty list.
+    store.flush
+
+    // Dirty list is now empty, and every CacheEntry has dirty=null.
+
+    // Corrupt the dirty list by creating two dirty lists that toggle back and 
+    // forth depending on whether the last dirty write was to 1 or 0. The trick
+    // here is that every element in the cache is treated as the "head" of the
+    // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
+    // You can end up with multiple nodes each having their own version of the 
+    // dirty list with different elements in them.
+    store.put(keys(1), something)
+    store.put(keys(0), something)
+    store.put(keys(1), something)
+    store.flush
+    // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
+    store.put(keys(0), something)
+    // The dirty list now has 0 and 1, but 1's dirty field is null in the 
+    // cache because it was just flushed.
+
+    // Get rid of 1 from the cache by reading every other element, and then 
+    // putting one new element.
+    letters
+      .slice(2, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.get(_))
+    store.put(keys(TestKeyValueStores.CacheSize), something)
+
+    // Now try and trigger an NPE since the dirty list has an element (1) 
+    // that's no longer in the cache.
+    store.flush
+  }
+
+  /**
+   * A little test that tries to simulate a few common patterns:
+   * read-modify-write, and do-some-stuff-then-delete (windowing).
+   */
+  @Test
+  def testRandomReadWriteRemove() {
+    // Make test deterministic by seeding the random number generator.
+    val rand = new Random(12345)
+    val keys = letters
+      .map(b(_))
+      .toArray
+
+    // Map from letter to key byte array used for letter, and expected value.
+    // We have to go through some acrobatics here since Java's byte array uses 
+    // object identity for .equals. Two byte arrays can have identical byte 
+    // elements, but not be equal.
+    var expected = Map[String, (Array[Byte], String)]()
+
+    (0 until 100).foreach(loop => {
+      (0 until 30).foreach(i => {
+        val idx = rand.nextInt(keys.length)
+        val randomValue = letters(rand.nextInt(keys.length))
+        val key = keys(idx)
+        val currentVal = store.get(key)
+        store.put(key, b(randomValue))
+        expected += letters(idx) -> (key, randomValue)
+      })
+
+      for ((k, v) <- expected) {
+        val bytes = store.get(v._1)
+        assertNotNull(bytes)
+        assertEquals(v._2, new String(bytes, "UTF-8"))
+      }
+
+      // Iterating and making structural modifications (deletion) does not look right.
+      // Separating these two steps
+      val iterator = store.all
+      val allKeys = new ArrayBuffer[Array[Byte]]()
+
+      // First iterate
+      while (iterator.hasNext) {
+        allKeys += iterator.next.getKey
+      }
+      iterator.close
+
+      // And now delete
+      for (key <- allKeys) {
+        store.delete(key)
+        expected -= new String(key, "UTF-8")
+      }
+
+      assertEquals(0, expected.size)
+    })
+  }
+
+  def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
+    for (v <- vals) {
+      assertTrue(iter.hasNext)
+      val entry = iter.next()
+      assertEquals(v, s(entry.getKey))
+      assertEquals(v, s(entry.getValue))
+    }
+    assertFalse(iter.hasNext)
+    intercept[NoSuchElementException] { iter.next() }
+  }
+
+  /**
+   * Convert string to byte buffer
+   */
+  def b(s: String) =
+    s.getBytes
+
+  /**
+   * Convert byte buffer to string
+   */
+  def s(b: Array[Byte]) =
+    new String(b)
+}
+
+object TestKeyValueStores {
+  val CacheSize = 10
+  val BatchSize = 5
+  @Parameters
+  def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..b3624e6
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.serializers.Serde
+import org.apache.samza.storage.{StorageEngine, StorageEngineFactory}
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.MessageCollector
+
+/**
+ * A key value storage engine factory implementation
+ *
+ * This trait encapsulates all the steps needed to create a key value storage engine. It is meant to be extended
+ * by the specific key value store factory implementations which will in turn override the getKVStore method.
+ */
+trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
+
+  /**
+   * Return a KeyValueStore instance for the given store name
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A valid KeyValueStore instance
+   */
+  def getKVStore( storeName: String,
+                  storeDir: File,
+                  registry: MetricsRegistry,
+                  changeLogSystemStreamPartition: SystemStreamPartition,
+                  containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param collector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  def getStorageEngine( storeName: String,
+                        storeDir: File,
+                        keySerde: Serde[K],
+                        msgSerde: Serde[V],
+                        collector: MessageCollector,
+                        registry: MetricsRegistry,
+                        changeLogSystemStreamPartition: SystemStreamPartition,
+                        containerContext: SamzaContainerContext): StorageEngine = {
+
+    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+    val batchSize = storageConfig.getInt("write.batch.size", 500)
+    val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
+    val enableCache = cacheSize > 0
+
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException("A store's cache.size cannot be less than batch.size as batched values reside in cache.")
+    }
+
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.")
+    }
+
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.")
+    }
+
+    val kvStore = getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+
+    val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
+      kvStore
+    } else {
+      val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
+      new LoggedStore(kvStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
+    }
+
+    val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+    val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
+    val maybeCachedStore = if (enableCache) {
+      val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
+      new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
+    } else {
+      serialized
+    }
+    val db = new NullSafeKeyValueStore(maybeCachedStore)
+    val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
+
+    // TODO: Decide if we should use raw bytes when restoring
+
+    new KeyValueStorageEngine(db, kvStore, keyValueStorageEngineMetrics, batchSize)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 429f51a..5764093 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -19,9 +19,10 @@
 
 package org.apache.samza.storage.kv
 
-import scala.collection._
 import grizzled.slf4j.Logging
 
+import scala.collection._
+
 /**
  * A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold:
  * 1. Batch together writes to leveldb, this turns out to be a great optimization

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
index 8ffcfb1..df8efae 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
 
 class CachedStoreMetrics(
   val storeName: String = "unknown",

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index f42ea02..c084144 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -19,15 +19,11 @@
 
 package org.apache.samza.storage.kv
 
-import java.io.File
-import java.nio.ByteBuffer
-import org.apache.samza._
-import org.apache.samza.config.Config
-import org.apache.samza.serializers._
-import scala.collection.JavaConversions._
 import grizzled.slf4j.Logging
-import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.storage.StorageEngine
+import org.apache.samza.system.IncomingMessageEnvelope
+
+import scala.collection.JavaConversions._
 
 /**
  * A key value store.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
deleted file mode 100644
index 36b7e97..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.StorageEngine
-
-class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
-  def getStorageEngine(
-    storeName: String,
-    storeDir: File,
-    keySerde: Serde[K],
-    msgSerde: Serde[V],
-    collector: MessageCollector,
-    registry: MetricsRegistry,
-    changeLogSystemStreamPartition: SystemStreamPartition,
-    containerContext: SamzaContainerContext): StorageEngine = {
-
-    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
-    val batchSize = storageConfig.getInt("write.batch.size", 500)
-    val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
-    val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
-    val enableCache = cacheSize > 0
-
-    if (cacheSize > 0 && cacheSize < batchSize) {
-      throw new SamzaException("A stores cache.size cannot be less than batch.size as batched values reside in cache.")
-    }
-
-    if (keySerde == null) {
-      throw new SamzaException("Must define a key serde when using key value storage.")
-    }
-
-    if (msgSerde == null) {
-      throw new SamzaException("Must define a message serde when using key value storage.")
-    }
-
-    val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
-    val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
-    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
-    val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
-      levelDb
-    } else {
-      val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
-      new LoggedStore(levelDb, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
-    }
-    val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
-    val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
-    val maybeCachedStore = if (enableCache) {
-      val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
-      new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
-    } else {
-      serialized
-    }
-    val db = new NullSafeKeyValueStore(maybeCachedStore)
-    val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
-
-    // Decide if we should use raw bytes when restoring
-
-    new KeyValueStorageEngine(db, levelDb, keyValueStorageEngineMetrics, batchSize)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index a7958f6..233fba9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
 
 class KeyValueStorageEngineMetrics(
   val storeName: String = "unknown",

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
new file mode 100644
index 0000000..79092b9
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap}
+
+class KeyValueStoreMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+  val bytesWritten = newCounter("bytes-written")
+  val bytesRead = newCounter("bytes-read")
+
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
deleted file mode 100644
index 72562cf..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.nio.ByteBuffer
-import org.iq80.leveldb._
-import org.fusesource.leveldbjni.internal.NativeComparator
-import org.fusesource.leveldbjni.JniDBFactory._
-import java.io._
-import java.util.Iterator
-import java.lang.Iterable
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import grizzled.slf4j.{ Logger, Logging }
-
-object LevelDbKeyValueStore {
-  private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
-
-  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
-    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
-    val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
-    val options = new Options
-
-    // Cache size and write buffer size are specified on a per-container basis.
-    options.cacheSize(cacheSize / containerContext.partitions.size)
-    options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
-    options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
-    options.compressionType(
-      storeConfig.get("leveldb.compression", "snappy") match {
-        case "snappy" => CompressionType.SNAPPY
-        case "none" => CompressionType.NONE
-        case _ =>
-          logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
-          CompressionType.SNAPPY
-      })
-    options.createIfMissing(true)
-    options.errorIfExists(true)
-    options
-  }
-}
-
-class LevelDbKeyValueStore(
-  val dir: File,
-  val options: Options,
-
-  /**
-   * How many deletes must occur before we will force a compaction. This is to
-   * get around performance issues discovered in SAMZA-254. A value of -1 
-   * disables this feature.
-   */
-  val deleteCompactionThreshold: Int = -1,
-  val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
-
-  private lazy val db = factory.open(dir, options)
-  private val lexicographic = new LexicographicComparator()
-  private var deletesSinceLastCompaction = 0
-
-  def get(key: Array[Byte]): Array[Byte] = {
-    maybeCompact
-    metrics.gets.inc
-    require(key != null, "Null key not allowed.")
-    val found = db.get(key)
-    if (found != null) {
-      metrics.bytesRead.inc(found.size)
-    }
-    found
-  }
-
-  def put(key: Array[Byte], value: Array[Byte]) {
-    metrics.puts.inc
-    require(key != null, "Null key not allowed.")
-    if (value == null) {
-      db.delete(key)
-      deletesSinceLastCompaction += 1
-    } else {
-      metrics.bytesWritten.inc(key.size + value.size)
-      db.put(key, value)
-    }
-  }
-
-  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
-    val batch = db.createWriteBatch()
-    val iter = entries.iterator
-    var wrote = 0
-    var deletes = 0
-    while (iter.hasNext) {
-      wrote += 1
-      val curr = iter.next()
-      if (curr.getValue == null) {
-        deletes += 1
-        batch.delete(curr.getKey)
-      } else {
-        val key = curr.getKey
-        val value = curr.getValue
-        metrics.bytesWritten.inc(key.size + value.size)
-        batch.put(key, value)
-      }
-    }
-    db.write(batch)
-    batch.close
-    metrics.puts.inc(wrote)
-    metrics.deletes.inc(deletes)
-    deletesSinceLastCompaction += deletes
-  }
-
-  def delete(key: Array[Byte]) {
-    metrics.deletes.inc
-    put(key, null)
-  }
-
-  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    maybeCompact
-    metrics.ranges.inc
-    require(from != null && to != null, "Null bound not allowed.")
-    new LevelDbRangeIterator(db.iterator, from, to)
-  }
-
-  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    maybeCompact
-    metrics.alls.inc
-    val iter = db.iterator()
-    iter.seekToFirst()
-    new LevelDbIterator(iter)
-  }
-
-  /**
-   * Trigger a complete compaction on the LevelDB store if there have been at
-   * least deleteCompactionThreshold deletes since the last compaction.
-   */
-  def maybeCompact = {
-    if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
-      compact
-    }
-  }
-
-  /**
-   * Trigger a complete compaction of the LevelDB store.
-   */
-  def compact {
-    // According to LevelDB's docs:
-    // begin==NULL is treated as a key before all keys in the database.
-    // end==NULL is treated as a key after all keys in the database.
-    db.compactRange(null, null)
-    deletesSinceLastCompaction = 0
-  }
-
-  def flush {
-    metrics.flushes.inc
-    // TODO can't find a flush for leveldb
-    trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
-  }
-
-  def close() {
-    trace("Closing.")
-
-    db.close()
-  }
-
-  class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
-    private var open = true
-    def close() = {
-      open = false
-      iter.close()
-    }
-    def remove() = iter.remove()
-    def hasNext() = iter.hasNext()
-    def next() = {
-      if (!hasNext()) {
-        throw new NoSuchElementException
-      }
-
-      val curr = iter.next
-      val key = curr.getKey
-      val value = curr.getValue
-      metrics.bytesRead.inc(key.size)
-      if (value != null) {
-        metrics.bytesRead.inc(value.size)
-      }
-      new Entry(key, value)
-    }
-    override def finalize() {
-      if (open) {
-        System.err.println("Leaked reference to level db iterator, forcing close.")
-        close()
-      }
-    }
-  }
-
-  class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
-    val comparator = if (options.comparator == null) lexicographic else options.comparator
-    iter.seek(from)
-    override def hasNext() = {
-      iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) <= 0
-    }
-  }
-
-  /**
-   * Compare two array lexicographically using unsigned byte arithmetic
-   */
-  class LexicographicComparator extends DBComparator {
-    def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
-      val l = math.min(k1.length, k2.length)
-      var i = 0
-      while (i < l) {
-        if (k1(i) != k2(i))
-          return (k1(i) & 0xff) - (k2(i) & 0xff)
-        i += 1
-      }
-      // okay prefixes are equal, the shorter array is less
-      k1.length - k2.length
-    }
-    def name(): String = "lexicographic"
-    def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
-    def findShortSuccessor(key: Array[Byte]) = key
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
deleted file mode 100644
index 8949f2f..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
-
-class LevelDbKeyValueStoreMetrics(
-  val storeName: String = "unknown",
-  val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
-
-  val gets = newCounter("gets")
-  val ranges = newCounter("ranges")
-  val alls = newCounter("alls")
-  val puts = newCounter("puts")
-  val deletes = newCounter("deletes")
-  val flushes = newCounter("flushes")
-  val bytesWritten = newCounter("bytes-written")
-  val bytesRead = newCounter("bytes-read")
-
-  override def getPrefix = storeName + "-"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index d3c1ae8..4ad6312 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -19,12 +19,9 @@
 
 package org.apache.samza.storage.kv
 
-import java.nio.ByteBuffer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
 import grizzled.slf4j.Logging
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
 
 /**
  * A key/value store decorator that adds a changelog for any changes made to the underlying store

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
index ea56e8c..743151a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
 
 class LoggedStoreMetrics(
   val storeName: String = "unknown",

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 00f9af3..4f48cf4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Util.notNull
+
 import scala.collection.JavaConversions._
 
 object NullSafeKeyValueStore {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 2d3b6e5..51ee68f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -19,9 +19,8 @@
 
 package org.apache.samza.storage.kv
 
-import java.util.Iterator
-import org.apache.samza.serializers._
 import grizzled.slf4j.Logging
+import org.apache.samza.serializers._
 
 /**
  * A key-value store wrapper that handles serialization

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 2ad21c8..841e4a2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
 
 class SerializedKeyValueStoreMetrics(
   val storeName: String = "unknown",

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
deleted file mode 100644
index 4856be0..0000000
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import java.util.Arrays
-import java.util.Random
-import scala.collection.JavaConversions._
-import org.iq80.leveldb.Options
-import org.junit.After
-import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.apache.samza.serializers.Serde
-import org.scalatest.Assertions.intercept
-
-@RunWith(value = classOf[Parameterized])
-class TestKeyValueStores(typeOfStore: String) {
-  import TestKeyValueStores._
-
-  val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
-  val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
-  var store: KeyValueStore[Array[Byte], Array[Byte]] = null
-  var cache = false
-  var serde = false
-
-  @Before
-  def setup() {
-    dir.mkdirs()
-    val leveldb = new LevelDbKeyValueStore(dir, new Options)
-    val passThroughSerde = new Serde[Array[Byte]] {
-      def toBytes(obj: Array[Byte]) = obj
-      def fromBytes(bytes: Array[Byte]) = bytes
-    }
-    store = if ("cache".equals(typeOfStore)) {
-      cache = true
-      new CachedStore(leveldb, CacheSize, BatchSize)
-    } else if ("serde".equals(typeOfStore)) {
-      serde = true
-      new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
-    } else if ("cache-and-serde".equals(typeOfStore)) {
-      val serializedStore = new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
-      serde = true
-      cache = true
-      new CachedStore(serializedStore, CacheSize, BatchSize)
-    } else {
-      leveldb
-    }
-
-    store = new NullSafeKeyValueStore(store)
-  }
-
-  @After
-  def teardown() {
-    store.close
-    for (file <- dir.listFiles)
-      file.delete()
-    dir.delete()
-  }
-
-  @Test
-  def getNonExistantIsNull() {
-    assertNull(store.get(b("hello")))
-  }
-
-  @Test
-  def putAndGet() {
-    store.put(b("k"), b("v"))
-    assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
-  }
-
-  @Test
-  def doublePutAndGet() {
-    val k = b("k2")
-    store.put(k, b("v1"))
-    store.put(k, b("v2"))
-    store.put(k, b("v3"))
-    assertTrue(Arrays.equals(b("v3"), store.get(k)))
-  }
-
-  @Test
-  def testNullsWithSerde() {
-    if (serde) {
-      val a = b("a")
-      val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
-      val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
-
-      intercept[NullPointerException] { store.get(null) }
-      intercept[NullPointerException] { store.delete(null) }
-      intercept[NullPointerException] { store.put(null, a) }
-      intercept[NullPointerException] { store.put(a, null) }
-      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
-      intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
-      intercept[NullPointerException] { store.range(a, null) }
-      intercept[NullPointerException] { store.range(null, a) }
-    }
-  }
-
-  @Test
-  def testPutAll() {
-    // Use CacheSize - 1 so we fully fill the cache, but don't write any data 
-    // out. Our check (below) uses == for cached entries, and using 
-    // numEntires >= CacheSize would result in the LRU cache dropping some 
-    // entries. The result would be that we get the correct byte array back 
-    // from the cache's underlying store (leveldb), but that == would fail.
-    val numEntries = CacheSize - 1
-    val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
-    store.putAll(entries)
-    if (cache) {
-      assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
-    } else {
-      assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
-    }
-  }
-
-  @Test
-  def testIterateAll() {
-    for (letter <- letters)
-      store.put(b(letter.toString), b(letter.toString))
-    val iter = store.all
-    checkRange(letters, iter)
-    iter.close()
-  }
-
-  @Test
-  def testRange() {
-    val from = 5
-    val to = 20
-    for (letter <- letters)
-      store.put(b(letter.toString), b(letter.toString))
-    val iter = store.range(b(letters(from)), b(letters(to)))
-    checkRange(letters.slice(from, to + 1), iter)
-    iter.close()
-  }
-
-  @Test
-  def testDelete() {
-    val a = b("a")
-    assertNull(store.get(a))
-    store.put(a, a)
-    assertTrue(Arrays.equals(a, store.get(a)))
-    store.delete(a)
-    assertNull(store.get(a))
-  }
-
-  @Test
-  def testSimpleScenario() {
-    val vals = letters.map(b(_))
-    for (v <- vals) {
-      assertNull(store.get(v))
-      store.put(v, v)
-      assertTrue(Arrays.equals(v, store.get(v)))
-    }
-    vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
-    vals.foreach(v => store.delete(v))
-    vals.foreach(v => assertNull(store.get(v)))
-  }
-
-  /**
-   * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
-   * implementation. The issue is that it doesn't work. More specifically,
-   * creating a DoubleLinkedList from an existing list does not update the
-   * "prev" field of the existing list's head to point to the new head. As a
-   * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
-   * Samza gets around this by manually updating the field itself. See SAMZA-80
-   * for details.
-   *
-   * This issue is exposed in Samza's KV cache implementation, which uses
-   * DoubleLinkedList, so all comments in this method are discussing the cached
-   * implementation, but the test is still useful as a sanity check for
-   * non-cached stores.
-   */
-  @Test
-  def testBrokenScalaDoubleLinkedList() {
-    val something = b("")
-    val keys = letters
-      .map(b(_))
-      .toArray
-
-    // Load the cache to capacity.
-    letters
-      .slice(0, TestKeyValueStores.CacheSize)
-      .map(b(_))
-      .foreach(store.put(_, something))
-
-    // Now keep everything in the cache, but with an empty dirty list.
-    store.flush
-
-    // Dirty list is now empty, and every CacheEntry has dirty=null.
-
-    // Corrupt the dirty list by creating two dirty lists that toggle back and 
-    // forth depending on whether the last dirty write was to 1 or 0. The trick
-    // here is that every element in the cache is treated as the "head" of the
-    // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
-    // You can end up with multiple nodes each having their own version of the 
-    // dirty list with different elements in them.
-    store.put(keys(1), something)
-    store.put(keys(0), something)
-    store.put(keys(1), something)
-    store.flush
-    // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
-    store.put(keys(0), something)
-    // The dirty list now has 0 and 1, but 1's dirty field is null in the 
-    // cache because it was just flushed.
-
-    // Get rid of 1 from the cache by reading every other element, and then 
-    // putting one new element.
-    letters
-      .slice(2, TestKeyValueStores.CacheSize)
-      .map(b(_))
-      .foreach(store.get(_))
-    store.put(keys(TestKeyValueStores.CacheSize), something)
-
-    // Now try and trigger an NPE since the dirty list has an element (1) 
-    // that's no longer in the cache.
-    store.flush
-  }
-
-  /**
-   * A little test that tries to simulate a few common patterns:
-   * read-modify-write, and do-some-stuff-then-delete (windowing).
-   */
-  @Test
-  def testRandomReadWriteRemove() {
-    // Make test deterministic by seeding the random number generator.
-    val rand = new Random(12345)
-    val keys = letters
-      .map(b(_))
-      .toArray
-
-    // Map from letter to key byte array used for letter, and expected value.
-    // We have to go through some acrobatics here since Java's byte array uses 
-    // object identity for .equals. Two byte arrays can have identical byte 
-    // elements, but not be equal.
-    var expected = Map[String, (Array[Byte], String)]()
-
-    (0 until 100).foreach(loop => {
-      (0 until 30).foreach(i => {
-        val idx = rand.nextInt(keys.length)
-        val randomValue = letters(rand.nextInt(keys.length))
-        val key = keys(idx)
-        val currentVal = store.get(key)
-        store.put(key, b(randomValue))
-        expected += letters(idx) -> (key, randomValue)
-      })
-
-      for ((k, v) <- expected) {
-        val bytes = store.get(v._1)
-        assertNotNull(bytes)
-        assertEquals(v._2, new String(bytes, "UTF-8"))
-      }
-
-      val iterator = store.all
-
-      while (iterator.hasNext) {
-        val key = iterator.next.getKey
-        store.delete(key)
-        expected -= new String(key, "UTF-8")
-      }
-
-      iterator.close
-
-      assertEquals(0, expected.size)
-    })
-  }
-
-  def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
-    for (v <- vals) {
-      assertTrue(iter.hasNext)
-      val entry = iter.next()
-      assertEquals(v, s(entry.getKey))
-      assertEquals(v, s(entry.getValue))
-    }
-    assertFalse(iter.hasNext)
-    intercept[NoSuchElementException] { iter.next() }
-  }
-
-  /**
-   * Convert string to byte buffer
-   */
-  def b(s: String) =
-    s.getBytes
-
-  /**
-   * Convert byte buffer to string
-   */
-  def s(b: Array[Byte]) =
-    new String(b)
-}
-
-object TestKeyValueStores {
-  val CacheSize = 10
-  val BatchSize = 5
-  @Parameters
-  def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("cache"), Array("serde"), Array("cache-and-serde"), Array("leveldb"))
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 0077af0..c0ac5dd 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -26,12 +26,10 @@ import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.kv.KeyValueStorageEngine
-import org.apache.samza.storage.kv.KeyValueStorageEngineFactory
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.task.ReadableCollector
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Util
-import org.apache.samza.serializers.Serde
 import org.apache.samza.serializers.ByteSerde
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 6ba6280..db5c32b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-include 'samza-api', 'samza-core', 'samza-kafka', 'samza-kv', 'samza-serializers', 'samza-shell', 'samza-yarn', 'samza-test'
+include 'samza-api', 'samza-core', 'samza-kafka', 'samza-kv', 'samza-kv-inmemory', 'samza-kv-leveldb', 'samza-serializers', 'samza-shell', 'samza-yarn', 'samza-test'
 
 rootProject.children.each {
   if (it.name != 'samza-api' && it.name != 'samza-shell') {