You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/10 23:09:44 UTC
git commit: SAMZA-256; write an in-memory store
Repository: incubator-samza
Updated Branches:
refs/heads/master 8edb3fed7 -> 46bc23b92
SAMZA-256; write an in-memory store
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/46bc23b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/46bc23b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/46bc23b9
Branch: refs/heads/master
Commit: 46bc23b926419751c546d1f59c1c241b56d5cbce
Parents: 8edb3fe
Author: Chinmay Soman <ch...@gmail.com>
Authored: Thu Jul 10 14:07:52 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 10 14:07:52 2014 -0700
----------------------------------------------------------------------
build.gradle | 30 +-
gradle/dependency-versions.gradle | 1 +
.../InMemoryKeyValueStorageEngineFactory.scala | 41 +++
.../kv/inmemory/InMemoryKeyValueStore.scala | 115 +++++++
.../kv/KeyValueStorageEngineFactory.scala | 42 +++
.../LevelDbKeyValueStorageEngineFactory.scala | 61 ++++
.../samza/storage/kv/LevelDbKeyValueStore.scala | 234 +++++++++++++
.../samza/storage/kv/TestKeyValueStores.scala | 344 +++++++++++++++++++
.../kv/BaseKeyValueStorageEngineFactory.scala | 118 +++++++
.../apache/samza/storage/kv/CachedStore.scala | 3 +-
.../samza/storage/kv/CachedStoreMetrics.scala | 3 +-
.../storage/kv/KeyValueStorageEngine.scala | 10 +-
.../kv/KeyValueStorageEngineFactory.scala | 86 -----
.../kv/KeyValueStorageEngineMetrics.scala | 3 +-
.../samza/storage/kv/KeyValueStoreMetrics.scala | 38 ++
.../samza/storage/kv/LevelDbKeyValueStore.scala | 234 -------------
.../kv/LevelDbKeyValueStoreMetrics.scala | 41 ---
.../apache/samza/storage/kv/LoggedStore.scala | 7 +-
.../samza/storage/kv/LoggedStoreMetrics.scala | 3 +-
.../storage/kv/NullSafeKeyValueStore.scala | 1 +
.../storage/kv/SerializedKeyValueStore.scala | 3 +-
.../kv/SerializedKeyValueStoreMetrics.scala | 3 +-
.../samza/storage/kv/TestKeyValueStores.scala | 316 -----------------
.../performance/TestKeyValuePerformance.scala | 2 -
settings.gradle | 2 +-
25 files changed, 1037 insertions(+), 704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c853fc0..6d0ac97 100644
--- a/build.gradle
+++ b/build.gradle
@@ -291,6 +291,34 @@ project(":samza-kv_$scalaVersion") {
compile project(":samza-core_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
+ testCompile "junit:junit:$junitVersion"
+ }
+}
+
+project(":samza-kv-inmemory_$scalaVersion") {
+ apply plugin: 'scala'
+
+ dependencies {
+ compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
+ compile project(":samza-kv_$scalaVersion")
+ compile "org.scala-lang:scala-library:$scalaLibVersion"
+ compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
+ compile "com.google.guava:guava:$guavaVersion"
+ testCompile "junit:junit:$junitVersion"
+ }
+}
+
+project(":samza-kv-leveldb_$scalaVersion") {
+ apply plugin: 'scala'
+
+ dependencies {
+ compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
+ compile project(":samza-kv_$scalaVersion")
+ compile project(":samza-kv-inmemory_$scalaVersion")
+ compile "org.scala-lang:scala-library:$scalaLibVersion"
+ compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
@@ -309,7 +337,7 @@ project(":samza-test_$scalaVersion") {
dependencies {
compile project(':samza-api')
- compile project(":samza-kv_$scalaVersion")
+ compile project(":samza-kv-leveldb_$scalaVersion")
compile project(":samza-core_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 0787258..7373582 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -31,4 +31,5 @@
leveldbVersion = "1.8"
yarnVersion = "2.4.0"
slf4jVersion = "1.6.2"
+ guavaVersion = "17.0"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..53147ad
--- /dev/null
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory
+
+import java.io.File
+
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.storage.kv.{KeyValueStoreMetrics, BaseKeyValueStorageEngineFactory, KeyValueStore}
+import org.apache.samza.system.SystemStreamPartition
+
+class InMemoryKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+ override def getKVStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ val metrics = new KeyValueStoreMetrics(storeName, registry)
+ val inMemoryDb = new InMemoryKeyValueStore (metrics)
+ inMemoryDb
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
new file mode 100644
index 0000000..8e1493a
--- /dev/null
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory
+
+import com.google.common.primitives.UnsignedBytes
+import grizzled.slf4j.Logging
+import org.apache.samza.storage.kv.{KeyValueStoreMetrics, KeyValueIterator, Entry, KeyValueStore}
+import java.util
+
+/**
+ * In memory implementation of a key value store.
+ *
+ * This uses a TreeMap to store the keys in order
+ *
+ * @param metrics A metrics instance to publish key-value store related statistics
+ */
+class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics)
+ extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+ val underlying = new util.TreeMap[Array[Byte], Array[Byte]] (UnsignedBytes.lexicographicalComparator())
+
+ override def flush(): Unit = {
+ // No-op for In memory store.
+ metrics.flushes.inc
+ }
+
+ override def close(): Unit = Unit
+
+ private def getIter(tm:util.SortedMap[Array[Byte], Array[Byte]]) = {
+ new KeyValueIterator[Array[Byte], Array[Byte]] {
+ val iter = tm.entrySet().iterator()
+
+ override def close(): Unit = Unit
+
+ override def remove(): Unit = iter.remove()
+
+ override def next(): Entry[Array[Byte], Array[Byte]] = {
+ val n = iter.next()
+ if (n != null && n.getKey != null) {
+ metrics.bytesRead.inc(n.getKey.size)
+ }
+ if (n != null && n.getValue != null) {
+ metrics.bytesRead.inc(n.getValue.size)
+ }
+ new Entry(n.getKey, n.getValue)
+ }
+
+ override def hasNext: Boolean = iter.hasNext
+ }
+ }
+ override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.alls.inc
+ getIter(underlying)
+ }
+
+ override def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.ranges.inc
+ require(from != null && to != null, "Null bound not allowed.")
+ getIter(underlying.subMap(from, to))
+ }
+
+ override def delete(key: Array[Byte]): Unit = {
+ metrics.deletes.inc
+ put(key, null)
+ }
+
+ override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
+ // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
+ // to use it, in order to putAll here. Therefore, just iterate here.
+ val iter = entries.iterator()
+ while(iter.hasNext) {
+ val next = iter.next()
+ put(next.getKey, next.getValue)
+ }
+ }
+
+ override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+ metrics.puts.inc
+ require(key != null, "Null key not allowed.")
+ if (value == null) {
+ metrics.deletes.inc
+ underlying.remove(key)
+ } else {
+ metrics.bytesWritten.inc(key.size + value.size)
+ underlying.put(key, value)
+ }
+ }
+
+ override def get(key: Array[Byte]): Array[Byte] = {
+ metrics.gets.inc
+ require(key != null, "Null key not allowed.")
+ val found = underlying.get(key)
+ if (found != null) {
+ metrics.bytesRead.inc(found.size)
+ }
+ found
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..0ba4f8a
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemStreamPartition
+
+/**
+ * A backwards compatible factory that points to LevelDb. This exists for all the old Samza jobs
+ * that still refer to KeyValueStorageEngineFactory.
+ */
+class KeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+ override def getKVStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..9642823
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.storage.StorageEngine
+
+object LevelDbKeyValueStorageEngineFactory {
+ def getKeyValueStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+ val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
+
+ val levelDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+ val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
+ val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
+
+ levelDb
+ }
+
+}
+
+class LevelDbKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
+
+ override def getKVStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
new file mode 100644
index 0000000..751fe4c
--- /dev/null
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.nio.ByteBuffer
+import org.iq80.leveldb._
+import org.fusesource.leveldbjni.internal.NativeComparator
+import org.fusesource.leveldbjni.JniDBFactory._
+import java.io._
+import java.util.Iterator
+import java.lang.Iterable
+import org.apache.samza.config.Config
+import org.apache.samza.container.SamzaContainerContext
+import grizzled.slf4j.{ Logger, Logging }
+
+object LevelDbKeyValueStore {
+ private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
+
+ def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
+ val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
+ val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
+ val options = new Options
+
+ // Cache size and write buffer size are specified on a per-container basis.
+ options.cacheSize(cacheSize / containerContext.partitions.size)
+ options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
+ options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
+ options.compressionType(
+ storeConfig.get("leveldb.compression", "snappy") match {
+ case "snappy" => CompressionType.SNAPPY
+ case "none" => CompressionType.NONE
+ case _ =>
+ logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
+ CompressionType.SNAPPY
+ })
+ options.createIfMissing(true)
+ options.errorIfExists(true)
+ options
+ }
+}
+
+class LevelDbKeyValueStore(
+ val dir: File,
+ val options: Options,
+
+ /**
+ * How many deletes must occur before we will force a compaction. This is to
+ * get around performance issues discovered in SAMZA-254. A value of -1
+ * disables this feature.
+ */
+ val deleteCompactionThreshold: Int = -1,
+ val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+
+ private lazy val db = factory.open(dir, options)
+ private val lexicographic = new LexicographicComparator()
+ private var deletesSinceLastCompaction = 0
+
+ def get(key: Array[Byte]): Array[Byte] = {
+ maybeCompact
+ metrics.gets.inc
+ require(key != null, "Null key not allowed.")
+ val found = db.get(key)
+ if (found != null) {
+ metrics.bytesRead.inc(found.size)
+ }
+ found
+ }
+
+ def put(key: Array[Byte], value: Array[Byte]) {
+ metrics.puts.inc
+ require(key != null, "Null key not allowed.")
+ if (value == null) {
+ db.delete(key)
+ deletesSinceLastCompaction += 1
+ } else {
+ metrics.bytesWritten.inc(key.size + value.size)
+ db.put(key, value)
+ }
+ }
+
+ def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
+ val batch = db.createWriteBatch()
+ val iter = entries.iterator
+ var wrote = 0
+ var deletes = 0
+ while (iter.hasNext) {
+ wrote += 1
+ val curr = iter.next()
+ if (curr.getValue == null) {
+ deletes += 1
+ batch.delete(curr.getKey)
+ } else {
+ val key = curr.getKey
+ val value = curr.getValue
+ metrics.bytesWritten.inc(key.size + value.size)
+ batch.put(key, value)
+ }
+ }
+ db.write(batch)
+ batch.close
+ metrics.puts.inc(wrote)
+ metrics.deletes.inc(deletes)
+ deletesSinceLastCompaction += deletes
+ }
+
+ def delete(key: Array[Byte]) {
+ metrics.deletes.inc
+ put(key, null)
+ }
+
+ def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ maybeCompact
+ metrics.ranges.inc
+ require(from != null && to != null, "Null bound not allowed.")
+ new LevelDbRangeIterator(db.iterator, from, to)
+ }
+
+ def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ maybeCompact
+ metrics.alls.inc
+ val iter = db.iterator()
+ iter.seekToFirst()
+ new LevelDbIterator(iter)
+ }
+
+ /**
+ * Trigger a complete compaction on the LevelDB store if there have been at
+ * least deleteCompactionThreshold deletes since the last compaction.
+ */
+ def maybeCompact = {
+ if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
+ compact
+ }
+ }
+
+ /**
+ * Trigger a complete compaction of the LevelDB store.
+ */
+ def compact {
+ // According to LevelDB's docs:
+ // begin==NULL is treated as a key before all keys in the database.
+ // end==NULL is treated as a key after all keys in the database.
+ db.compactRange(null, null)
+ deletesSinceLastCompaction = 0
+ }
+
+ def flush {
+ metrics.flushes.inc
+ // TODO can't find a flush for leveldb
+ trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
+ }
+
+ def close() {
+ trace("Closing.")
+
+ db.close()
+ }
+
+ class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
+ private var open = true
+ def close() = {
+ open = false
+ iter.close()
+ }
+ def remove() = iter.remove()
+ def hasNext() = iter.hasNext()
+ def next() = {
+ if (!hasNext()) {
+ throw new NoSuchElementException
+ }
+
+ val curr = iter.next
+ val key = curr.getKey
+ val value = curr.getValue
+ metrics.bytesRead.inc(key.size)
+ if (value != null) {
+ metrics.bytesRead.inc(value.size)
+ }
+ new Entry(key, value)
+ }
+ override def finalize() {
+ if (open) {
+ System.err.println("Leaked reference to level db iterator, forcing close.")
+ close()
+ }
+ }
+ }
+
+ class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
+ val comparator = if (options.comparator == null) lexicographic else options.comparator
+ iter.seek(from)
+ override def hasNext() = {
+ iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) < 0
+ }
+ }
+
+ /**
+ * Compare two array lexicographically using unsigned byte arithmetic
+ */
+ class LexicographicComparator extends DBComparator {
+ def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+ val l = math.min(k1.length, k2.length)
+ var i = 0
+ while (i < l) {
+ if (k1(i) != k2(i))
+ return (k1(i) & 0xff) - (k2(i) & 0xff)
+ i += 1
+ }
+ // okay prefixes are equal, the shorter array is less
+ k1.length - k2.length
+ }
+ def name(): String = "lexicographic"
+ def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
+ def findShortSuccessor(key: Array[Byte]) = key
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
new file mode 100644
index 0000000..d6d437a
--- /dev/null
+++ b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import java.util.Arrays
+import java.util.Random
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
+
+import scala.collection.JavaConversions._
+import org.iq80.leveldb.Options
+import org.junit.After
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.apache.samza.serializers.Serde
+import org.scalatest.Assertions.intercept
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Test suite to check different key value store operations
+ * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory")
+ * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store
+ */
+@RunWith(value = classOf[Parameterized])
+class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
+ import TestKeyValueStores._
+
+ val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
+ val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
+ var store: KeyValueStore[Array[Byte], Array[Byte]] = null
+ var cache = false
+ var serde = false
+
+ @Before
+ def setup() {
+ val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
+ dir.mkdirs()
+ val leveldb = new LevelDbKeyValueStore(dir, new Options)
+ leveldb
+ } else if ("inmemory".equals(typeOfStore)) {
+ val inmemoryDb = new InMemoryKeyValueStore
+ inmemoryDb
+ } else {
+ throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
+ }
+
+ val passThroughSerde = new Serde[Array[Byte]] {
+ def toBytes(obj: Array[Byte]) = obj
+ def fromBytes(bytes: Array[Byte]) = bytes
+ }
+ store = if ("cache".equals(storeConfig)) {
+ cache = true
+ new CachedStore(kvStore, CacheSize, BatchSize)
+ } else if ("serde".equals(storeConfig)) {
+ serde = true
+ new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ } else if ("cache-and-serde".equals(storeConfig)) {
+ val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ serde = true
+ cache = true
+ new CachedStore(serializedStore, CacheSize, BatchSize)
+ } else {
+ kvStore
+ }
+
+ store = new NullSafeKeyValueStore(store)
+ }
+
+ @After
+ def teardown() {
+ store.close
+ if (dir != null && dir.listFiles() != null) {
+ for (file <- dir.listFiles)
+ file.delete()
+ dir.delete()
+ }
+ }
+
+ @Test
+ def getNonExistantIsNull() {
+ assertNull(store.get(b("hello")))
+ }
+
+ @Test
+ def putAndGet() {
+ store.put(b("k"), b("v"))
+ assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
+ }
+
+ @Test
+ def doublePutAndGet() {
+ val k = b("k2")
+ store.put(k, b("v1"))
+ store.put(k, b("v2"))
+ store.put(k, b("v3"))
+ assertTrue(Arrays.equals(b("v3"), store.get(k)))
+ }
+
+ @Test
+ def testNullsWithSerde() {
+ if (serde) {
+ val a = b("a")
+ val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
+ val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
+
+ intercept[NullPointerException] { store.get(null) }
+ intercept[NullPointerException] { store.delete(null) }
+ intercept[NullPointerException] { store.put(null, a) }
+ intercept[NullPointerException] { store.put(a, null) }
+ intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
+ intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
+ intercept[NullPointerException] { store.range(a, null) }
+ intercept[NullPointerException] { store.range(null, a) }
+ }
+ }
+
+ @Test
+ def testPutAll() {
+ // Use CacheSize - 1 so we fully fill the cache, but don't write any data
+ // out. Our check (below) uses == for cached entries, and using
+ // numEntires >= CacheSize would result in the LRU cache dropping some
+ // entries. The result would be that we get the correct byte array back
+ // from the cache's underlying store (leveldb), but that == would fail.
+ val numEntries = CacheSize - 1
+ val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
+ store.putAll(entries)
+ if (cache) {
+ assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
+ } else {
+ assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
+ }
+ }
+
+ @Test
+ def testIterateAll() {
+ for (letter <- letters)
+ store.put(b(letter.toString), b(letter.toString))
+ val iter = store.all
+ checkRange(letters, iter)
+ iter.close()
+ }
+
+ @Test
+ def testRange() {
+ val from = 5
+ val to = 20
+ for (letter <- letters)
+ store.put(b(letter.toString), b(letter.toString))
+
+ val iter = store.range(b(letters(from)), b(letters(to)))
+ checkRange(letters.slice(from, to), iter)
+ iter.close()
+ }
+
+ @Test
+ def testDelete() {
+ val a = b("a")
+ assertNull(store.get(a))
+ store.put(a, a)
+ assertTrue(Arrays.equals(a, store.get(a)))
+ store.delete(a)
+ assertNull(store.get(a))
+ }
+
+ @Test
+ def testSimpleScenario() {
+ val vals = letters.map(b(_))
+ for (v <- vals) {
+ assertNull(store.get(v))
+ store.put(v, v)
+ assertTrue(Arrays.equals(v, store.get(v)))
+ }
+ vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
+ vals.foreach(v => store.delete(v))
+ vals.foreach(v => assertNull(store.get(v)))
+ }
+
+ /**
+ * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
+ * implementation. The issue is that it doesn't work. More specifically,
+ * creating a DoubleLinkedList from an existing list does not update the
+ * "prev" field of the existing list's head to point to the new head. As a
+ * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
+ * Samza gets around this by manually updating the field itself. See SAMZA-80
+ * for details.
+ *
+ * This issue is exposed in Samza's KV cache implementation, which uses
+ * DoubleLinkedList, so all comments in this method are discussing the cached
+ * implementation, but the test is still useful as a sanity check for
+ * non-cached stores.
+ */
+ @Test
+ def testBrokenScalaDoubleLinkedList() {
+ val something = b("")
+ val keys = letters
+ .map(b(_))
+ .toArray
+
+ // Load the cache to capacity.
+ letters
+ .slice(0, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.put(_, something))
+
+ // Now keep everything in the cache, but with an empty dirty list.
+ store.flush
+
+ // Dirty list is now empty, and every CacheEntry has dirty=null.
+
+ // Corrupt the dirty list by creating two dirty lists that toggle back and
+ // forth depending on whether the last dirty write was to 1 or 0. The trick
+ // here is that every element in the cache is treated as the "head" of the
+ // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
+ // You can end up with multiple nodes each having their own version of the
+ // dirty list with different elements in them.
+ store.put(keys(1), something)
+ store.put(keys(0), something)
+ store.put(keys(1), something)
+ store.flush
+ // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
+ store.put(keys(0), something)
+ // The dirty list now has 0 and 1, but 1's dirty field is null in the
+ // cache because it was just flushed.
+
+ // Get rid of 1 from the cache by reading every other element, and then
+ // putting one new element.
+ letters
+ .slice(2, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.get(_))
+ store.put(keys(TestKeyValueStores.CacheSize), something)
+
+ // Now try and trigger an NPE since the dirty list has an element (1)
+ // that's no longer in the cache.
+ store.flush
+ }
+
+ /**
+ * A little test that tries to simulate a few common patterns:
+ * read-modify-write, and do-some-stuff-then-delete (windowing).
+ */
+ @Test
+ def testRandomReadWriteRemove() {
+ // Make test deterministic by seeding the random number generator.
+ val rand = new Random(12345)
+ val keys = letters
+ .map(b(_))
+ .toArray
+
+ // Map from letter to key byte array used for letter, and expected value.
+ // We have to go through some acrobatics here since Java's byte array uses
+ // object identity for .equals. Two byte arrays can have identical byte
+ // elements, but not be equal.
+ var expected = Map[String, (Array[Byte], String)]()
+
+ (0 until 100).foreach(loop => {
+ (0 until 30).foreach(i => {
+ val idx = rand.nextInt(keys.length)
+ val randomValue = letters(rand.nextInt(keys.length))
+ val key = keys(idx)
+ val currentVal = store.get(key)
+ store.put(key, b(randomValue))
+ expected += letters(idx) -> (key, randomValue)
+ })
+
+ for ((k, v) <- expected) {
+ val bytes = store.get(v._1)
+ assertNotNull(bytes)
+ assertEquals(v._2, new String(bytes, "UTF-8"))
+ }
+
+ // Iterating and making structural modifications (deletion) does not look right.
+ // Separating these two steps
+ val iterator = store.all
+ val allKeys = new ArrayBuffer[Array[Byte]]()
+
+ // First iterate
+ while (iterator.hasNext) {
+ allKeys += iterator.next.getKey
+ }
+ iterator.close
+
+ // And now delete
+ for (key <- allKeys) {
+ store.delete(key)
+ expected -= new String(key, "UTF-8")
+ }
+
+ assertEquals(0, expected.size)
+ })
+ }
+
+ def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
+ for (v <- vals) {
+ assertTrue(iter.hasNext)
+ val entry = iter.next()
+ assertEquals(v, s(entry.getKey))
+ assertEquals(v, s(entry.getValue))
+ }
+ assertFalse(iter.hasNext)
+ intercept[NoSuchElementException] { iter.next() }
+ }
+
+ /**
+ * Convert string to byte buffer
+ */
+ def b(s: String) =
+ s.getBytes
+
+ /**
+ * Convert byte buffer to string
+ */
+ def s(b: Array[Byte]) =
+ new String(b)
+}
+
+object TestKeyValueStores {
+ val CacheSize = 10
+ val BatchSize = 5
+ @Parameters
+ def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
new file mode 100644
index 0000000..b3624e6
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.serializers.Serde
+import org.apache.samza.storage.{StorageEngine, StorageEngineFactory}
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.MessageCollector
+
+/**
+ * A key value storage engine factory implementation
+ *
+ * This trait encapsulates all the steps needed to create a key value storage engine. It is meant to be extended
+ * by the specific key value store factory implementations which will in turn override the getKVStore method.
+ */
+trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
+
+ /**
+ * Return a KeyValueStore instance for the given store name
+ * @param storeName Name of the store
+ * @param storeDir The directory of the store
+ * @param registry MetricsRegistry to which to publish store specific metrics.
+ * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+ * @param containerContext Information about the container in which the task is executing.
+ * @return A valid KeyValueStore instance
+ */
+ def getKVStore( storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
+
+ /**
+ * Constructs a key-value StorageEngine and returns it to the caller
+ *
+ * @param storeName The name of the storage engine.
+ * @param storeDir The directory of the storage engine.
+ * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+ * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+ * @param collector MessageCollector the storage engine uses to persist changes.
+ * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+ * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+ * @param containerContext Information about the container in which the task is executing.
+ **/
+ def getStorageEngine( storeName: String,
+ storeDir: File,
+ keySerde: Serde[K],
+ msgSerde: Serde[V],
+ collector: MessageCollector,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ containerContext: SamzaContainerContext): StorageEngine = {
+
+ val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+ val batchSize = storageConfig.getInt("write.batch.size", 500)
+ val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
+ val enableCache = cacheSize > 0
+
+ if (cacheSize > 0 && cacheSize < batchSize) {
+ throw new SamzaException("A store's cache.size cannot be less than batch.size as batched values reside in cache.")
+ }
+
+ if (keySerde == null) {
+ throw new SamzaException("Must define a key serde when using key value storage.")
+ }
+
+ if (msgSerde == null) {
+ throw new SamzaException("Must define a message serde when using key value storage.")
+ }
+
+ val kvStore = getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+
+ val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
+ kvStore
+ } else {
+ val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
+ new LoggedStore(kvStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
+ }
+
+ val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+ val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
+ val maybeCachedStore = if (enableCache) {
+ val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
+ new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
+ } else {
+ serialized
+ }
+ val db = new NullSafeKeyValueStore(maybeCachedStore)
+ val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
+
+ // TODO: Decide if we should use raw bytes when restoring
+
+ new KeyValueStorageEngine(db, kvStore, keyValueStorageEngineMetrics, batchSize)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 429f51a..5764093 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -19,9 +19,10 @@
package org.apache.samza.storage.kv
-import scala.collection._
import grizzled.slf4j.Logging
+import scala.collection._
+
/**
* A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold:
* 1. Batch together writes to leveldb, this turns out to be a great optimization
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
index 8ffcfb1..df8efae 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -19,10 +19,9 @@
package org.apache.samza.storage.kv
+import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
class CachedStoreMetrics(
val storeName: String = "unknown",
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index f42ea02..c084144 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -19,15 +19,11 @@
package org.apache.samza.storage.kv
-import java.io.File
-import java.nio.ByteBuffer
-import org.apache.samza._
-import org.apache.samza.config.Config
-import org.apache.samza.serializers._
-import scala.collection.JavaConversions._
import grizzled.slf4j.Logging
-import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.storage.StorageEngine
+import org.apache.samza.system.IncomingMessageEnvelope
+
+import scala.collection.JavaConversions._
/**
* A key value store.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
deleted file mode 100644
index 36b7e97..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.StorageEngine
-
-class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
- def getStorageEngine(
- storeName: String,
- storeDir: File,
- keySerde: Serde[K],
- msgSerde: Serde[V],
- collector: MessageCollector,
- registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
- containerContext: SamzaContainerContext): StorageEngine = {
-
- val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
- val batchSize = storageConfig.getInt("write.batch.size", 500)
- val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
- val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
- val enableCache = cacheSize > 0
-
- if (cacheSize > 0 && cacheSize < batchSize) {
- throw new SamzaException("A stores cache.size cannot be less than batch.size as batched values reside in cache.")
- }
-
- if (keySerde == null) {
- throw new SamzaException("Must define a key serde when using key value storage.")
- }
-
- if (msgSerde == null) {
- throw new SamzaException("Must define a message serde when using key value storage.")
- }
-
- val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
- val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
- val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
- val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
- levelDb
- } else {
- val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
- new LoggedStore(levelDb, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
- }
- val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
- val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
- val maybeCachedStore = if (enableCache) {
- val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
- new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
- } else {
- serialized
- }
- val db = new NullSafeKeyValueStore(maybeCachedStore)
- val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
-
- // Decide if we should use raw bytes when restoring
-
- new KeyValueStorageEngine(db, levelDb, keyValueStorageEngineMetrics, batchSize)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index a7958f6..233fba9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -19,10 +19,9 @@
package org.apache.samza.storage.kv
+import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
class KeyValueStorageEngineMetrics(
val storeName: String = "unknown",
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
new file mode 100644
index 0000000..79092b9
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap}
+
+class KeyValueStoreMetrics(
+ val storeName: String = "unknown",
+ val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+ val gets = newCounter("gets")
+ val ranges = newCounter("ranges")
+ val alls = newCounter("alls")
+ val puts = newCounter("puts")
+ val deletes = newCounter("deletes")
+ val flushes = newCounter("flushes")
+ val bytesWritten = newCounter("bytes-written")
+ val bytesRead = newCounter("bytes-read")
+
+ override def getPrefix = storeName + "-"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
deleted file mode 100644
index 72562cf..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.nio.ByteBuffer
-import org.iq80.leveldb._
-import org.fusesource.leveldbjni.internal.NativeComparator
-import org.fusesource.leveldbjni.JniDBFactory._
-import java.io._
-import java.util.Iterator
-import java.lang.Iterable
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import grizzled.slf4j.{ Logger, Logging }
-
-object LevelDbKeyValueStore {
- private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
-
- def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
- val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
- val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
- val options = new Options
-
- // Cache size and write buffer size are specified on a per-container basis.
- options.cacheSize(cacheSize / containerContext.partitions.size)
- options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
- options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
- options.compressionType(
- storeConfig.get("leveldb.compression", "snappy") match {
- case "snappy" => CompressionType.SNAPPY
- case "none" => CompressionType.NONE
- case _ =>
- logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
- CompressionType.SNAPPY
- })
- options.createIfMissing(true)
- options.errorIfExists(true)
- options
- }
-}
-
-class LevelDbKeyValueStore(
- val dir: File,
- val options: Options,
-
- /**
- * How many deletes must occur before we will force a compaction. This is to
- * get around performance issues discovered in SAMZA-254. A value of -1
- * disables this feature.
- */
- val deleteCompactionThreshold: Int = -1,
- val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
-
- private lazy val db = factory.open(dir, options)
- private val lexicographic = new LexicographicComparator()
- private var deletesSinceLastCompaction = 0
-
- def get(key: Array[Byte]): Array[Byte] = {
- maybeCompact
- metrics.gets.inc
- require(key != null, "Null key not allowed.")
- val found = db.get(key)
- if (found != null) {
- metrics.bytesRead.inc(found.size)
- }
- found
- }
-
- def put(key: Array[Byte], value: Array[Byte]) {
- metrics.puts.inc
- require(key != null, "Null key not allowed.")
- if (value == null) {
- db.delete(key)
- deletesSinceLastCompaction += 1
- } else {
- metrics.bytesWritten.inc(key.size + value.size)
- db.put(key, value)
- }
- }
-
- def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
- val batch = db.createWriteBatch()
- val iter = entries.iterator
- var wrote = 0
- var deletes = 0
- while (iter.hasNext) {
- wrote += 1
- val curr = iter.next()
- if (curr.getValue == null) {
- deletes += 1
- batch.delete(curr.getKey)
- } else {
- val key = curr.getKey
- val value = curr.getValue
- metrics.bytesWritten.inc(key.size + value.size)
- batch.put(key, value)
- }
- }
- db.write(batch)
- batch.close
- metrics.puts.inc(wrote)
- metrics.deletes.inc(deletes)
- deletesSinceLastCompaction += deletes
- }
-
- def delete(key: Array[Byte]) {
- metrics.deletes.inc
- put(key, null)
- }
-
- def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
- maybeCompact
- metrics.ranges.inc
- require(from != null && to != null, "Null bound not allowed.")
- new LevelDbRangeIterator(db.iterator, from, to)
- }
-
- def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
- maybeCompact
- metrics.alls.inc
- val iter = db.iterator()
- iter.seekToFirst()
- new LevelDbIterator(iter)
- }
-
- /**
- * Trigger a complete compaction on the LevelDB store if there have been at
- * least deleteCompactionThreshold deletes since the last compaction.
- */
- def maybeCompact = {
- if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
- compact
- }
- }
-
- /**
- * Trigger a complete compaction of the LevelDB store.
- */
- def compact {
- // According to LevelDB's docs:
- // begin==NULL is treated as a key before all keys in the database.
- // end==NULL is treated as a key after all keys in the database.
- db.compactRange(null, null)
- deletesSinceLastCompaction = 0
- }
-
- def flush {
- metrics.flushes.inc
- // TODO can't find a flush for leveldb
- trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
- }
-
- def close() {
- trace("Closing.")
-
- db.close()
- }
-
- class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
- private var open = true
- def close() = {
- open = false
- iter.close()
- }
- def remove() = iter.remove()
- def hasNext() = iter.hasNext()
- def next() = {
- if (!hasNext()) {
- throw new NoSuchElementException
- }
-
- val curr = iter.next
- val key = curr.getKey
- val value = curr.getValue
- metrics.bytesRead.inc(key.size)
- if (value != null) {
- metrics.bytesRead.inc(value.size)
- }
- new Entry(key, value)
- }
- override def finalize() {
- if (open) {
- System.err.println("Leaked reference to level db iterator, forcing close.")
- close()
- }
- }
- }
-
- class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
- val comparator = if (options.comparator == null) lexicographic else options.comparator
- iter.seek(from)
- override def hasNext() = {
- iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) <= 0
- }
- }
-
- /**
- * Compare two array lexicographically using unsigned byte arithmetic
- */
- class LexicographicComparator extends DBComparator {
- def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
- val l = math.min(k1.length, k2.length)
- var i = 0
- while (i < l) {
- if (k1(i) != k2(i))
- return (k1(i) & 0xff) - (k2(i) & 0xff)
- i += 1
- }
- // okay prefixes are equal, the shorter array is less
- k1.length - k2.length
- }
- def name(): String = "lexicographic"
- def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
- def findShortSuccessor(key: Array[Byte]) = key
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
deleted file mode 100644
index 8949f2f..0000000
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
-
-class LevelDbKeyValueStoreMetrics(
- val storeName: String = "unknown",
- val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
-
- val gets = newCounter("gets")
- val ranges = newCounter("ranges")
- val alls = newCounter("alls")
- val puts = newCounter("puts")
- val deletes = newCounter("deletes")
- val flushes = newCounter("flushes")
- val bytesWritten = newCounter("bytes-written")
- val bytesRead = newCounter("bytes-read")
-
- override def getPrefix = storeName + "-"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index d3c1ae8..4ad6312 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -19,12 +19,9 @@
package org.apache.samza.storage.kv
-import java.nio.ByteBuffer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
import grizzled.slf4j.Logging
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
/**
* A key/value store decorator that adds a changelog for any changes made to the underlying store
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
index ea56e8c..743151a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
@@ -19,10 +19,9 @@
package org.apache.samza.storage.kv
+import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
class LoggedStoreMetrics(
val storeName: String = "unknown",
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 00f9af3..4f48cf4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -20,6 +20,7 @@
package org.apache.samza.storage.kv
import org.apache.samza.util.Util.notNull
+
import scala.collection.JavaConversions._
object NullSafeKeyValueStore {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 2d3b6e5..51ee68f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -19,9 +19,8 @@
package org.apache.samza.storage.kv
-import java.util.Iterator
-import org.apache.samza.serializers._
import grizzled.slf4j.Logging
+import org.apache.samza.serializers._
/**
* A key-value store wrapper that handles serialization
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 2ad21c8..841e4a2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -19,10 +19,9 @@
package org.apache.samza.storage.kv
+import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.MetricsHelper
class SerializedKeyValueStoreMetrics(
val storeName: String = "unknown",
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
deleted file mode 100644
index 4856be0..0000000
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import java.util.Arrays
-import java.util.Random
-import scala.collection.JavaConversions._
-import org.iq80.leveldb.Options
-import org.junit.After
-import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.apache.samza.serializers.Serde
-import org.scalatest.Assertions.intercept
-
-@RunWith(value = classOf[Parameterized])
-class TestKeyValueStores(typeOfStore: String) {
- import TestKeyValueStores._
-
- val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
- val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
- var store: KeyValueStore[Array[Byte], Array[Byte]] = null
- var cache = false
- var serde = false
-
- @Before
- def setup() {
- dir.mkdirs()
- val leveldb = new LevelDbKeyValueStore(dir, new Options)
- val passThroughSerde = new Serde[Array[Byte]] {
- def toBytes(obj: Array[Byte]) = obj
- def fromBytes(bytes: Array[Byte]) = bytes
- }
- store = if ("cache".equals(typeOfStore)) {
- cache = true
- new CachedStore(leveldb, CacheSize, BatchSize)
- } else if ("serde".equals(typeOfStore)) {
- serde = true
- new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
- } else if ("cache-and-serde".equals(typeOfStore)) {
- val serializedStore = new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
- serde = true
- cache = true
- new CachedStore(serializedStore, CacheSize, BatchSize)
- } else {
- leveldb
- }
-
- store = new NullSafeKeyValueStore(store)
- }
-
- @After
- def teardown() {
- store.close
- for (file <- dir.listFiles)
- file.delete()
- dir.delete()
- }
-
- @Test
- def getNonExistantIsNull() {
- assertNull(store.get(b("hello")))
- }
-
- @Test
- def putAndGet() {
- store.put(b("k"), b("v"))
- assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
- }
-
- @Test
- def doublePutAndGet() {
- val k = b("k2")
- store.put(k, b("v1"))
- store.put(k, b("v2"))
- store.put(k, b("v3"))
- assertTrue(Arrays.equals(b("v3"), store.get(k)))
- }
-
- @Test
- def testNullsWithSerde() {
- if (serde) {
- val a = b("a")
- val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
- val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
-
- intercept[NullPointerException] { store.get(null) }
- intercept[NullPointerException] { store.delete(null) }
- intercept[NullPointerException] { store.put(null, a) }
- intercept[NullPointerException] { store.put(a, null) }
- intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
- intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
- intercept[NullPointerException] { store.range(a, null) }
- intercept[NullPointerException] { store.range(null, a) }
- }
- }
-
- @Test
- def testPutAll() {
- // Use CacheSize - 1 so we fully fill the cache, but don't write any data
- // out. Our check (below) uses == for cached entries, and using
- // numEntires >= CacheSize would result in the LRU cache dropping some
- // entries. The result would be that we get the correct byte array back
- // from the cache's underlying store (leveldb), but that == would fail.
- val numEntries = CacheSize - 1
- val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
- store.putAll(entries)
- if (cache) {
- assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
- } else {
- assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
- }
- }
-
- @Test
- def testIterateAll() {
- for (letter <- letters)
- store.put(b(letter.toString), b(letter.toString))
- val iter = store.all
- checkRange(letters, iter)
- iter.close()
- }
-
- @Test
- def testRange() {
- val from = 5
- val to = 20
- for (letter <- letters)
- store.put(b(letter.toString), b(letter.toString))
- val iter = store.range(b(letters(from)), b(letters(to)))
- checkRange(letters.slice(from, to + 1), iter)
- iter.close()
- }
-
- @Test
- def testDelete() {
- val a = b("a")
- assertNull(store.get(a))
- store.put(a, a)
- assertTrue(Arrays.equals(a, store.get(a)))
- store.delete(a)
- assertNull(store.get(a))
- }
-
- @Test
- def testSimpleScenario() {
- val vals = letters.map(b(_))
- for (v <- vals) {
- assertNull(store.get(v))
- store.put(v, v)
- assertTrue(Arrays.equals(v, store.get(v)))
- }
- vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
- vals.foreach(v => store.delete(v))
- vals.foreach(v => assertNull(store.get(v)))
- }
-
- /**
- * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
- * implementation. The issue is that it doesn't work. More specifically,
- * creating a DoubleLinkedList from an existing list does not update the
- * "prev" field of the existing list's head to point to the new head. As a
- * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
- * Samza gets around this by manually updating the field itself. See SAMZA-80
- * for details.
- *
- * This issue is exposed in Samza's KV cache implementation, which uses
- * DoubleLinkedList, so all comments in this method are discussing the cached
- * implementation, but the test is still useful as a sanity check for
- * non-cached stores.
- */
- @Test
- def testBrokenScalaDoubleLinkedList() {
- val something = b("")
- val keys = letters
- .map(b(_))
- .toArray
-
- // Load the cache to capacity.
- letters
- .slice(0, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.put(_, something))
-
- // Now keep everything in the cache, but with an empty dirty list.
- store.flush
-
- // Dirty list is now empty, and every CacheEntry has dirty=null.
-
- // Corrupt the dirty list by creating two dirty lists that toggle back and
- // forth depending on whether the last dirty write was to 1 or 0. The trick
- // here is that every element in the cache is treated as the "head" of the
- // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
- // You can end up with multiple nodes each having their own version of the
- // dirty list with different elements in them.
- store.put(keys(1), something)
- store.put(keys(0), something)
- store.put(keys(1), something)
- store.flush
- // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
- store.put(keys(0), something)
- // The dirty list now has 0 and 1, but 1's dirty field is null in the
- // cache because it was just flushed.
-
- // Get rid of 1 from the cache by reading every other element, and then
- // putting one new element.
- letters
- .slice(2, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.get(_))
- store.put(keys(TestKeyValueStores.CacheSize), something)
-
- // Now try and trigger an NPE since the dirty list has an element (1)
- // that's no longer in the cache.
- store.flush
- }
-
- /**
- * A little test that tries to simulate a few common patterns:
- * read-modify-write, and do-some-stuff-then-delete (windowing).
- */
- @Test
- def testRandomReadWriteRemove() {
- // Make test deterministic by seeding the random number generator.
- val rand = new Random(12345)
- val keys = letters
- .map(b(_))
- .toArray
-
- // Map from letter to key byte array used for letter, and expected value.
- // We have to go through some acrobatics here since Java's byte array uses
- // object identity for .equals. Two byte arrays can have identical byte
- // elements, but not be equal.
- var expected = Map[String, (Array[Byte], String)]()
-
- (0 until 100).foreach(loop => {
- (0 until 30).foreach(i => {
- val idx = rand.nextInt(keys.length)
- val randomValue = letters(rand.nextInt(keys.length))
- val key = keys(idx)
- val currentVal = store.get(key)
- store.put(key, b(randomValue))
- expected += letters(idx) -> (key, randomValue)
- })
-
- for ((k, v) <- expected) {
- val bytes = store.get(v._1)
- assertNotNull(bytes)
- assertEquals(v._2, new String(bytes, "UTF-8"))
- }
-
- val iterator = store.all
-
- while (iterator.hasNext) {
- val key = iterator.next.getKey
- store.delete(key)
- expected -= new String(key, "UTF-8")
- }
-
- iterator.close
-
- assertEquals(0, expected.size)
- })
- }
-
- def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
- for (v <- vals) {
- assertTrue(iter.hasNext)
- val entry = iter.next()
- assertEquals(v, s(entry.getKey))
- assertEquals(v, s(entry.getValue))
- }
- assertFalse(iter.hasNext)
- intercept[NoSuchElementException] { iter.next() }
- }
-
- /**
- * Convert string to byte buffer
- */
- def b(s: String) =
- s.getBytes
-
- /**
- * Convert byte buffer to string
- */
- def s(b: Array[Byte]) =
- new String(b)
-}
-
-object TestKeyValueStores {
- val CacheSize = 10
- val BatchSize = 5
- @Parameters
- def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("cache"), Array("serde"), Array("cache-and-serde"), Array("leveldb"))
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 0077af0..c0ac5dd 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -26,12 +26,10 @@ import org.apache.samza.container.SamzaContainerContext
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.kv.KeyValueStorageEngine
-import org.apache.samza.storage.kv.KeyValueStorageEngineFactory
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.task.ReadableCollector
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Util
-import org.apache.samza.serializers.Serde
import org.apache.samza.serializers.ByteSerde
import org.apache.samza.Partition
import org.apache.samza.SamzaException
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/46bc23b9/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 6ba6280..db5c32b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-include 'samza-api', 'samza-core', 'samza-kafka', 'samza-kv', 'samza-serializers', 'samza-shell', 'samza-yarn', 'samza-test'
+include 'samza-api', 'samza-core', 'samza-kafka', 'samza-kv', 'samza-kv-inmemory', 'samza-kv-leveldb', 'samza-serializers', 'samza-shell', 'samza-yarn', 'samza-test'
rootProject.children.each {
if (it.name != 'samza-api' && it.name != 'samza-shell') {