You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/04 18:33:01 UTC

samza git commit: SAMZA-435; remove leveldb

Repository: samza
Updated Branches:
  refs/heads/master f2fd9aaab -> 3a3c278a9


SAMZA-435; remove leveldb


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a3c278a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a3c278a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a3c278a

Branch: refs/heads/master
Commit: 3a3c278a9c16b5cbbedfd11238d3aeb2ef6e61e5
Parents: f2fd9aa
Author: Ruslan Khafizov <ru...@gmail.com>
Authored: Wed Feb 4 09:32:53 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Feb 4 09:32:53 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  14 --
 .../versioned/jobs/configuration-table.html     | 105 +--------
 docs/startup/download/index.md                  |   6 -
 gradle/dependency-versions.gradle               |   1 -
 .../kv/KeyValueStorageEngineFactory.scala       |  42 ----
 .../LevelDbKeyValueStorageEngineFactory.scala   |  61 ------
 .../samza/storage/kv/LevelDbKeyValueStore.scala | 218 -------------------
 .../apache/samza/storage/kv/CachedStore.scala   |   8 +-
 .../src/main/config/perf/kv-perf.properties     |   2 +-
 .../samza/storage/kv/TestKeyValueStores.scala   |  15 +-
 .../test/integration/TestStatefulTask.scala     |   2 +-
 settings.gradle                                 |   1 -
 12 files changed, 11 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4f9fb44..4d0b44f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -328,19 +328,6 @@ project(":samza-kv-inmemory_$scalaVersion") {
   }
 }
 
-project(":samza-kv-leveldb_$scalaVersion") {
-  apply plugin: 'scala'
-
-  dependencies {
-    compile project(':samza-api')
-    compile project(":samza-core_$scalaVersion")
-    compile project(":samza-kv_$scalaVersion")
-    compile "org.scala-lang:scala-library:$scalaLibVersion"
-    compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
-    testCompile "junit:junit:$junitVersion"
-  }
-}
-
 project(":samza-kv-rocksdb_$scalaVersion") {
   apply plugin: 'scala'
 
@@ -368,7 +355,6 @@ project(":samza-test_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-kv-inmemory_$scalaVersion")
-    compile project(":samza-kv-leveldb_$scalaVersion")
     compile project(":samza-kv-rocksdb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
     runtime project(":samza-log4j")

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 008fc78..ec12874 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -832,13 +832,8 @@
                         <a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config, org.apache.samza.task.TaskContext)">init()</a>
                         method). The value of this property is the fully-qualified name of a Java class that implements
                         <a href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>.
-                        Samza currently ships with two storage engine implementations:
+                        Samza currently ships with one storage engine implementation:
                         <dl>
-                            <dt><code>org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code></dt>
-                            <dd>An on-disk storage engine with a key-value interface, implemented using
-                                <a href="https://code.google.com/p/leveldb/">LevelDB</a>. It supports fast random-access
-                                reads and writes, as well as range queries on keys. LevelDB can be configured with
-                                various <a href="#keyvalue-leveldb">additional tuning parameters</a>.</dd>
                             <dt><code>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code></dt>
                             <dd>An on-disk storage engine with a key-value interface, implemented using
                                 <a href="http://rocksdb.org/">RocksDB</a>. It supports fast random-access
@@ -1004,7 +999,7 @@
                           <dt><code>fifo</code></dt>
                           <dd>Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.</dd>
                           <dt><code>level</code></dt>
-                          <dd>Use LevelDB's standard leveled compaction.</dd>
+                          <dd>Use RocksDB's standard leveled compaction.</dd>
                       </dl>
                     </td>
                 </tr>
@@ -1018,102 +1013,6 @@
                 </tr>
 
                 <tr>
-                    <th colspan="3" class="section" id="keyvalue-leveldb">
-                        Using LevelDB for key-value storage<br>
-                        <span class="subtitle">
-                            (This section applies if you have set
-                            <a href="#stores-factory" class="property">stores.*.factory</a>
-                            <code>= org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code>)
-                        </span>
-                    </th>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-write-batch-size">stores.<span class="store">store-name</span>.<br>write.batch.size</td>
-                    <td class="default">500</td>
-                    <td class="description">
-                        For better write performance, the storage engine buffers writes and applies them
-                        to the underlying store in a batch. If the same key is written multiple times
-                        in quick succession, this buffer also deduplicates writes to the same key. This
-                        property is set to the number of key/value pairs that should be kept in this
-                        in-memory buffer, per task instance. The number cannot be greater than
-                        <a href="#stores-leveldb-object-cache-size" class="property">stores.*.object.cache.size</a>.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-object-cache-size">stores.<span class="store">store-name</span>.<br>object.cache.size</td>
-                    <td class="default">1000</td>
-                    <td class="description">
-                        Samza maintains an additional cache in front of LevelDB for frequently-accessed
-                        objects. This cache contains deserialized objects (avoiding the deserialization
-                        overhead on cache hits), in contrast to the LevelDB block cache
-                        (<a href="#stores-leveldb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>),
-                        which caches serialized objects. This property determines the number of objects
-                        to keep in Samza's cache, per task instance. This same cache is also used for
-                        write buffering (see <a href="#stores-leveldb-write-batch-size" class="property">stores.*.write.batch.size</a>).
-                        A value of 0 disables all caching and batching.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-container-cache-size-bytes">stores.<span class="store">store-name</span>.container.<br>cache.size.bytes</td>
-                    <td class="default">104857600</td>
-                    <td class="description">
-                        The size of LevelDB's block cache in bytes, per container. If there are several
-                        task instances within one container, each is given a proportional share of this cache.
-                        Note that this is an off-heap memory allocation, so the container's total memory use
-                        is the maximum JVM heap size <em>plus</em> the size of this cache.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-container-write-buffer-size-bytes">stores.<span class="store">store-name</span>.container.<br>write.buffer.size.bytes</td>
-                    <td class="default">33554432</td>
-                    <td class="description">
-                        The amount of memory (in bytes) that LevelDB uses for buffering writes before they are
-                        written to disk, per container. If there are several task instances within one
-                        container, each is given a proportional share of this buffer. This setting also
-                        determines the size of LevelDB's segment files.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-compaction-delete-threshold">stores.<span class="store">store-name</span>.<br>compaction.delete.threshold</td>
-                    <td class="default">-1</td>
-                    <td class="description">
-                        Setting this property forces a LevelDB compaction to be performed after a certain
-                        number of keys have been deleted from the store. This is used to work around
-                        <a href="https://issues.apache.org/jira/browse/SAMZA-254">performance issues</a>
-                        in certain workloads.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-compression">stores.<span class="store">store-name</span>.<br>leveldb.compression</td>
-                    <td class="default">snappy</td>
-                    <td class="description">
-                        This property controls whether LevelDB should compress data on disk and in the
-                        block cache. The following values are valid:
-                        <dl>
-                            <dt><code>snappy</code></dt>
-                            <dd>Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.</dd>
-                            <dt><code>none</code></dt>
-                            <dd>Do not compress data.</dd>
-                        </dl>
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="stores-leveldb-block-size-bytes">stores.<span class="store">store-name</span>.<br>leveldb.block.size.bytes</td>
-                    <td class="default">4096</td>
-                    <td class="description">
-                        If compression is enabled, LevelDB groups approximately this many uncompressed bytes
-                        into one compressed block. You probably don't need to change this property.
-                    </td>
-                </tr>
-
-                <tr>
                     <th colspan="3" class="section" id="yarn">
                         Running your job on a <a href="../jobs/yarn-jobs.html">YARN</a> cluster<br>
                         <span class="subtitle">

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/docs/startup/download/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/download/index.md b/docs/startup/download/index.md
index e358453..329969e 100644
--- a/docs/startup/download/index.md
+++ b/docs/startup/download/index.md
@@ -75,12 +75,6 @@ A Maven-based Samza project can pull in all required dependencies Samza dependen
 </dependency>
 <dependency>
   <groupId>org.apache.samza</groupId>
-  <artifactId>samza-kv-leveldb_2.10</artifactId>
-  <version>0.8.0</version>
-  <scope>runtime</scope>
-</dependency>
-<dependency>
-  <groupId>org.apache.samza</groupId>
   <artifactId>samza-kv-rocksdb_2.10</artifactId>
   <version>0.8.0</version>
   <scope>runtime</scope>

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 7bbaa41..12b324d 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -28,7 +28,6 @@
   metricsVersion = "2.2.0"
   kafkaVersion = "0.8.2-beta"
   commonsHttpClientVersion = "3.1"
-  leveldbVersion = "1.8"
   rocksdbVersion = "3.5.1"
   yarnVersion = "2.4.0"
   slf4jVersion = "1.6.2"

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
deleted file mode 100644
index 0ba4f8a..0000000
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamPartition
-
-/**
- * A backwards compatible factory that points to LevelDb. This exists for all the old Samza jobs
- * that still refer to KeyValueStorageEngineFactory.
- */
-class KeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
-
-  override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: SystemStreamPartition,
-                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
-    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
deleted file mode 100644
index 9642823..0000000
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.StorageEngine
-
-object LevelDbKeyValueStorageEngineFactory {
-  def getKeyValueStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: SystemStreamPartition,
-                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
-    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
-    val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
-
-    val levelDbMetrics = new KeyValueStoreMetrics(storeName, registry)
-    val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
-    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
-
-    levelDb
-  }
-
-}
-
-class LevelDbKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
-
-  override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: SystemStreamPartition,
-                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
-    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
deleted file mode 100644
index f4a021a..0000000
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import org.iq80.leveldb._
-import org.fusesource.leveldbjni.JniDBFactory._
-import java.io._
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.util.{LexicographicComparator, Logging}
-
-object LevelDbKeyValueStore extends Logging {
-
-  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
-    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
-    val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
-    val options = new Options
-
-    // Cache size and write buffer size are specified on a per-container basis.
-    options.cacheSize(cacheSize / containerContext.taskNames.size)
-    options.writeBufferSize((writeBufSize / containerContext.taskNames.size).toInt)
-    options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
-    options.compressionType(
-      storeConfig.get("leveldb.compression", "snappy") match {
-        case "snappy" => CompressionType.SNAPPY
-        case "none" => CompressionType.NONE
-        case _ =>
-          warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
-          CompressionType.SNAPPY
-      })
-    options.createIfMissing(true)
-    options.errorIfExists(true)
-    options
-  }
-}
-
-class LevelDbKeyValueStore(
-  val dir: File,
-  val options: Options,
-
-  /**
-   * How many deletes must occur before we will force a compaction. This is to
-   * get around performance issues discovered in SAMZA-254. A value of -1 
-   * disables this feature.
-   */
-  val deleteCompactionThreshold: Int = -1,
-  val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
-
-  private lazy val db = factory.open(dir, options)
-  private val lexicographic = new LevelDBLexicographicComparator()
-  private var deletesSinceLastCompaction = 0
-
-  def get(key: Array[Byte]): Array[Byte] = {
-    maybeCompact
-    metrics.gets.inc
-    require(key != null, "Null key not allowed.")
-    val found = db.get(key)
-    if (found != null) {
-      metrics.bytesRead.inc(found.size)
-    }
-    found
-  }
-
-  def put(key: Array[Byte], value: Array[Byte]) {
-    metrics.puts.inc
-    require(key != null, "Null key not allowed.")
-    if (value == null) {
-      db.delete(key)
-      deletesSinceLastCompaction += 1
-    } else {
-      metrics.bytesWritten.inc(key.size + value.size)
-      db.put(key, value)
-    }
-  }
-
-  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
-    val batch = db.createWriteBatch()
-    val iter = entries.iterator
-    var wrote = 0
-    var deletes = 0
-    while (iter.hasNext) {
-      wrote += 1
-      val curr = iter.next()
-      if (curr.getValue == null) {
-        deletes += 1
-        batch.delete(curr.getKey)
-      } else {
-        val key = curr.getKey
-        val value = curr.getValue
-        metrics.bytesWritten.inc(key.size + value.size)
-        batch.put(key, value)
-      }
-    }
-    db.write(batch)
-    batch.close
-    metrics.puts.inc(wrote)
-    metrics.deletes.inc(deletes)
-    deletesSinceLastCompaction += deletes
-  }
-
-  def delete(key: Array[Byte]) {
-    metrics.deletes.inc
-    put(key, null)
-  }
-
-  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    maybeCompact
-    metrics.ranges.inc
-    require(from != null && to != null, "Null bound not allowed.")
-    new LevelDbRangeIterator(db.iterator, from, to)
-  }
-
-  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    maybeCompact
-    metrics.alls.inc
-    val iter = db.iterator()
-    iter.seekToFirst()
-    new LevelDbIterator(iter)
-  }
-
-  /**
-   * Trigger a complete compaction on the LevelDB store if there have been at
-   * least deleteCompactionThreshold deletes since the last compaction.
-   */
-  def maybeCompact = {
-    if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
-      compact
-    }
-  }
-
-  /**
-   * Trigger a complete compaction of the LevelDB store.
-   */
-  def compact {
-    // According to LevelDB's docs:
-    // begin==NULL is treated as a key before all keys in the database.
-    // end==NULL is treated as a key after all keys in the database.
-    db.compactRange(null, null)
-    deletesSinceLastCompaction = 0
-  }
-
-  def flush {
-    metrics.flushes.inc
-    // TODO can't find a flush for leveldb
-    trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
-  }
-
-  def close() {
-    trace("Closing.")
-
-    db.close()
-  }
-
-  class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
-    private var open = true
-    def close() = {
-      open = false
-      iter.close()
-    }
-    def remove() = iter.remove()
-    def hasNext() = iter.hasNext()
-    def next() = {
-      if (!hasNext()) {
-        throw new NoSuchElementException
-      }
-
-      val curr = iter.next
-      val key = curr.getKey
-      val value = curr.getValue
-      metrics.bytesRead.inc(key.size)
-      if (value != null) {
-        metrics.bytesRead.inc(value.size)
-      }
-      new Entry(key, value)
-    }
-    override def finalize() {
-      if (open) {
-        System.err.println("Leaked reference to level db iterator, forcing close.")
-        close()
-      }
-    }
-  }
-
-  class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
-    val comparator = if (options.comparator == null) lexicographic else options.comparator
-    iter.seek(from)
-    override def hasNext() = {
-      iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) < 0
-    }
-  }
-
-  /**
-   * Compare two array lexicographically using unsigned byte arithmetic
-   */
-  class LevelDBLexicographicComparator extends LexicographicComparator with DBComparator {
-    def name(): String = "lexicographic"
-    def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
-    def findShortSuccessor(key: Array[Byte]) = key
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1971b1f..84cf6db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -24,14 +24,14 @@ import scala.collection._
 import java.util.Arrays
 
 /**
- * A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold:
- * 1. Batch together writes to leveldb, this turns out to be a great optimization
+ * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
+ * 1. Batch together writes to rocksdb, this turns out to be a great optimization
  * 2. Avoid duplicate writes and duplicate log entries within a commit interval. i.e. if there are two updates to the same key, log only the later.
  * 3. Avoid deserialization cost for gets on very common keys
  *
  * This caching does introduce a few odd corner cases :-(
- * 1. Items in the cache have pass-by-reference semantics but items in leveldb have pass-by-value semantics. Modifying items after a put is a bad idea.
- * 2. Range queries require flushing the cache (as the ordering comes from leveldb)
+ * 1. Items in the cache have pass-by-reference semantics but items in rocksdb have pass-by-value semantics. Modifying items after a put is a bad idea.
+ * 2. Range queries require flushing the cache (as the ordering comes from rocksdb)
  *
  * In implementation this cache is just an LRU hash map that discards the oldest entry when full. There is an accompanying "dirty list" that references keys
  * that have not yet been written to disk. All writes go to the dirty list and when the list is long enough we flush out all those values at once. Dirty items

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties
index dcc223f..0d487b1 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+stores.test.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.test.compaction.delete.threshold=1000
 test.partition.count=4
 test.num.loops=1000

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index f592d8e..50dfc10 100644
--- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -25,7 +25,6 @@ import java.util.Random
 
 import org.apache.samza.serializers.Serde
 import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
-import org.iq80.leveldb.Options
 import org.junit.After
 import org.junit.Assert._
 import org.junit.Before
@@ -40,7 +39,7 @@ import scala.collection.mutable.ArrayBuffer
 
 /**
  * Test suite to check different key value store operations
- * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory")
+ * @param typeOfStore Defines type of key-value store (Eg: "rocksdb" / "inmemory")
  * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store
  */
 @RunWith(value = classOf[Parameterized])
@@ -48,7 +47,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   import TestKeyValueStores._
 
   val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
-  val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
+  val dir = new File(System.getProperty("java.io.tmpdir"), "rocksdb-test-" + new Random().nextInt(Int.MaxValue))
   var store: KeyValueStore[Array[Byte], Array[Byte]] = null
   var cache = false
   var serde = false
@@ -56,9 +55,6 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   @Before
   def setup() {
     val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
-      case "leveldb" =>
-        dir.mkdirs ()
-        new LevelDbKeyValueStore (dir, new Options)
       case "inmemory" =>
         new InMemoryKeyValueStore
       case "rocksdb" =>
@@ -151,7 +147,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     // out. Our check (below) uses == for cached entries, and using 
     // numEntires >= CacheSize would result in the LRU cache dropping some 
     // entries. The result would be that we get the correct byte array back 
-    // from the cache's underlying store (leveldb), but that == would fail.
+    // from the cache's underlying store (rocksdb), but that == would fail.
     val numEntries = CacheSize - 1
     val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
     store.putAll(entries)
@@ -350,11 +346,6 @@ object TestKeyValueStores {
   val BatchSize = 1000000
   @Parameters
   def parameters: java.util.Collection[Array[String]] = Arrays.asList(
-      //LevelDB
-      Array("leveldb", "cache"),
-      Array("leveldb", "serde"),
-      Array("leveldb", "cache-and-serde"),
-      Array("leveldb", "none"),
       //Inmemory
       Array("inmemory", "cache"),
       Array("inmemory", "serde"),

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 6f0eb21..4c08d6b 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -205,7 +205,7 @@ class TestStatefulTask {
     "task.class" -> "org.apache.samza.test.integration.TestTask",
     "task.inputs" -> "kafka.input",
     "serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory",
-    "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
+    "stores.mystore.factory" -> "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
     "stores.mystore.changelog" -> "kafka.mystoreChangelog",

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3a01fd6..bb07a3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,7 +22,6 @@ include \
   'samza-kafka',
   'samza-kv',
   'samza-kv-inmemory',
-  'samza-kv-leveldb',
   'samza-kv-rocksdb',
   'samza-log4j',
   'samza-shell',