You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/15 00:06:46 UTC
git commit: SAMZA-346; move kv tests into samza-test module
Repository: incubator-samza
Updated Branches:
refs/heads/master b934aa873 -> 90d5a00fc
SAMZA-346; move kv tests into samza-test module
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/90d5a00f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/90d5a00f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/90d5a00f
Branch: refs/heads/master
Commit: 90d5a00fc3a729821410756db0754cdc2f327766
Parents: b934aa8
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Tue Oct 14 15:06:33 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Oct 14 15:06:33 2014 -0700
----------------------------------------------------------------------
build.gradle | 4 +-
.../samza/storage/kv/TestKeyValueStores.scala | 344 -------------------
.../samza/storage/kv/TestKeyValueStores.scala | 344 +++++++++++++++++++
3 files changed, 346 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d05dc8d..1b37dbb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -339,11 +339,9 @@ project(":samza-kv-leveldb_$scalaVersion") {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile project(":samza-kv_$scalaVersion")
- compile project(":samza-kv-inmemory_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
testCompile "junit:junit:$junitVersion"
- testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
}
}
@@ -359,6 +357,7 @@ project(":samza-test_$scalaVersion") {
dependencies {
compile project(':samza-api')
+ compile project(":samza-kv-inmemory_$scalaVersion")
compile project(":samza-kv-leveldb_$scalaVersion")
compile project(":samza-core_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
@@ -369,6 +368,7 @@ project(":samza-test_$scalaVersion") {
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "com.101tec:zkclient:$zkClientVersion"
testCompile project(":samza-kafka_$scalaVersion")
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
deleted file mode 100644
index eefe114..0000000
--- a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import java.util.Arrays
-import java.util.Random
-
-import org.apache.samza.serializers.Serde
-import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
-import org.iq80.leveldb.Options
-import org.junit.After
-import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.scalatest.Assertions.intercept
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Test suite to check different key value store operations
- * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory")
- * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store
- */
-@RunWith(value = classOf[Parameterized])
-class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
- import TestKeyValueStores._
-
- val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
- val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
- var store: KeyValueStore[Array[Byte], Array[Byte]] = null
- var cache = false
- var serde = false
-
- @Before
- def setup() {
- val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
- dir.mkdirs()
- val leveldb = new LevelDbKeyValueStore(dir, new Options)
- leveldb
- } else if ("inmemory".equals(typeOfStore)) {
- val inmemoryDb = new InMemoryKeyValueStore
- inmemoryDb
- } else {
- throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
- }
-
- val passThroughSerde = new Serde[Array[Byte]] {
- def toBytes(obj: Array[Byte]) = obj
- def fromBytes(bytes: Array[Byte]) = bytes
- }
- store = if ("cache".equals(storeConfig)) {
- cache = true
- new CachedStore(kvStore, CacheSize, BatchSize)
- } else if ("serde".equals(storeConfig)) {
- serde = true
- new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
- } else if ("cache-and-serde".equals(storeConfig)) {
- val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
- serde = true
- cache = true
- new CachedStore(serializedStore, CacheSize, BatchSize)
- } else {
- kvStore
- }
-
- store = new NullSafeKeyValueStore(store)
- }
-
- @After
- def teardown() {
- store.close
- if (dir != null && dir.listFiles() != null) {
- for (file <- dir.listFiles)
- file.delete()
- dir.delete()
- }
- }
-
- @Test
- def getNonExistantIsNull() {
- assertNull(store.get(b("hello")))
- }
-
- @Test
- def putAndGet() {
- store.put(b("k"), b("v"))
- assertArrayEquals(b("v"), store.get(b("k")))
- }
-
- @Test
- def doublePutAndGet() {
- val k = b("k2")
- store.put(k, b("v1"))
- store.put(k, b("v2"))
- store.put(k, b("v3"))
- assertArrayEquals(b("v3"), store.get(k))
- }
-
- @Test
- def testNullsWithSerde() {
- if (serde) {
- val a = b("a")
- val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
- val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
-
- intercept[NullPointerException] { store.get(null) }
- intercept[NullPointerException] { store.delete(null) }
- intercept[NullPointerException] { store.put(null, a) }
- intercept[NullPointerException] { store.put(a, null) }
- intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
- intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
- intercept[NullPointerException] { store.range(a, null) }
- intercept[NullPointerException] { store.range(null, a) }
- }
- }
-
- @Test
- def testPutAll() {
- // Use CacheSize - 1 so we fully fill the cache, but don't write any data
- // out. Our check (below) uses == for cached entries, and using
- // numEntires >= CacheSize would result in the LRU cache dropping some
- // entries. The result would be that we get the correct byte array back
- // from the cache's underlying store (leveldb), but that == would fail.
- val numEntries = CacheSize - 1
- val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
- store.putAll(entries)
- if (cache) {
- assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
- } else {
- assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
- }
- }
-
- @Test
- def testIterateAll() {
- for (letter <- letters)
- store.put(b(letter.toString), b(letter.toString))
- val iter = store.all
- checkRange(letters, iter)
- iter.close()
- }
-
- @Test
- def testRange() {
- val from = 5
- val to = 20
- for (letter <- letters)
- store.put(b(letter.toString), b(letter.toString))
-
- val iter = store.range(b(letters(from)), b(letters(to)))
- checkRange(letters.slice(from, to), iter)
- iter.close()
- }
-
- @Test
- def testDelete() {
- val a = b("a")
- assertNull(store.get(a))
- store.put(a, a)
- assertArrayEquals(a, store.get(a))
- store.delete(a)
- assertNull(store.get(a))
- }
-
- @Test
- def testSimpleScenario() {
- val vals = letters.map(b(_))
- for (v <- vals) {
- assertNull(store.get(v))
- store.put(v, v)
- assertArrayEquals(v, store.get(v))
- }
- vals.foreach(v => assertArrayEquals(v, store.get(v)))
- vals.foreach(v => store.delete(v))
- vals.foreach(v => assertNull(store.get(v)))
- }
-
- /**
- * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
- * implementation. The issue is that it doesn't work. More specifically,
- * creating a DoubleLinkedList from an existing list does not update the
- * "prev" field of the existing list's head to point to the new head. As a
- * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null.
- * Samza gets around this by manually updating the field itself. See SAMZA-80
- * for details.
- *
- * This issue is exposed in Samza's KV cache implementation, which uses
- * DoubleLinkedList, so all comments in this method are discussing the cached
- * implementation, but the test is still useful as a sanity check for
- * non-cached stores.
- */
- @Test
- def testBrokenScalaDoubleLinkedList() {
- val something = b("")
- val keys = letters
- .map(b(_))
- .toArray
-
- // Load the cache to capacity.
- letters
- .slice(0, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.put(_, something))
-
- // Now keep everything in the cache, but with an empty dirty list.
- store.flush
-
- // Dirty list is now empty, and every CacheEntry has dirty=null.
-
- // Corrupt the dirty list by creating two dirty lists that toggle back and
- // forth depending on whether the last dirty write was to 1 or 0. The trick
- // here is that every element in the cache is treated as the "head" of the
- // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
- // You can end up with multiple nodes each having their own version of the
- // dirty list with different elements in them.
- store.put(keys(1), something)
- store.put(keys(0), something)
- store.put(keys(1), something)
- store.flush
- // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
- store.put(keys(0), something)
- // The dirty list now has 0 and 1, but 1's dirty field is null in the
- // cache because it was just flushed.
-
- // Get rid of 1 from the cache by reading every other element, and then
- // putting one new element.
- letters
- .slice(2, TestKeyValueStores.CacheSize)
- .map(b(_))
- .foreach(store.get(_))
- store.put(keys(TestKeyValueStores.CacheSize), something)
-
- // Now try and trigger an NPE since the dirty list has an element (1)
- // that's no longer in the cache.
- store.flush
- }
-
- /**
- * A little test that tries to simulate a few common patterns:
- * read-modify-write, and do-some-stuff-then-delete (windowing).
- */
- @Test
- def testRandomReadWriteRemove() {
- // Make test deterministic by seeding the random number generator.
- val rand = new Random(12345)
- val keys = letters
- .map(b(_))
- .toArray
-
- // Map from letter to key byte array used for letter, and expected value.
- // We have to go through some acrobatics here since Java's byte array uses
- // object identity for .equals. Two byte arrays can have identical byte
- // elements, but not be equal.
- var expected = Map[String, (Array[Byte], String)]()
-
- (0 until 100).foreach(loop => {
- (0 until 30).foreach(i => {
- val idx = rand.nextInt(keys.length)
- val randomValue = letters(rand.nextInt(keys.length))
- val key = keys(idx)
- val currentVal = store.get(key)
- store.put(key, b(randomValue))
- expected += letters(idx) -> (key, randomValue)
- })
-
- for ((k, v) <- expected) {
- val bytes = store.get(v._1)
- assertNotNull(bytes)
- assertEquals(v._2, new String(bytes, "UTF-8"))
- }
-
- // Iterating and making structural modifications (deletion) does not look right.
- // Separating these two steps
- val iterator = store.all
- val allKeys = new ArrayBuffer[Array[Byte]]()
-
- // First iterate
- while (iterator.hasNext) {
- allKeys += iterator.next.getKey
- }
- iterator.close
-
- // And now delete
- for (key <- allKeys) {
- store.delete(key)
- expected -= new String(key, "UTF-8")
- }
-
- assertEquals(0, expected.size)
- })
- }
-
- def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
- for (v <- vals) {
- assertTrue(iter.hasNext)
- val entry = iter.next()
- assertEquals(v, s(entry.getKey))
- assertEquals(v, s(entry.getValue))
- }
- assertFalse(iter.hasNext)
- intercept[NoSuchElementException] { iter.next() }
- }
-
- /**
- * Convert string to byte buffer
- */
- def b(s: String) =
- s.getBytes
-
- /**
- * Convert byte buffer to string
- */
- def s(b: Array[Byte]) =
- new String(b)
-}
-
-object TestKeyValueStores {
- val CacheSize = 10
- val BatchSize = 5
- @Parameters
- def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
new file mode 100644
index 0000000..eefe114
--- /dev/null
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.File
+import java.util.Arrays
+import java.util.Random
+
+import org.apache.samza.serializers.Serde
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
+import org.iq80.leveldb.Options
+import org.junit.After
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.scalatest.Assertions.intercept
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Test suite to check different key value store operations
+ * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory")
+ * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store
+ */
+@RunWith(value = classOf[Parameterized])
+class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
+ import TestKeyValueStores._
+
+ val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
+ val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
+ var store: KeyValueStore[Array[Byte], Array[Byte]] = null
+ var cache = false
+ var serde = false
+
+ @Before
+ def setup() {
+ val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) {
+ dir.mkdirs()
+ val leveldb = new LevelDbKeyValueStore(dir, new Options)
+ leveldb
+ } else if ("inmemory".equals(typeOfStore)) {
+ val inmemoryDb = new InMemoryKeyValueStore
+ inmemoryDb
+ } else {
+ throw new IllegalArgumentException("Type of store undefined: " + typeOfStore)
+ }
+
+ val passThroughSerde = new Serde[Array[Byte]] {
+ def toBytes(obj: Array[Byte]) = obj
+ def fromBytes(bytes: Array[Byte]) = bytes
+ }
+ store = if ("cache".equals(storeConfig)) {
+ cache = true
+ new CachedStore(kvStore, CacheSize, BatchSize)
+ } else if ("serde".equals(storeConfig)) {
+ serde = true
+ new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ } else if ("cache-and-serde".equals(storeConfig)) {
+ val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde)
+ serde = true
+ cache = true
+ new CachedStore(serializedStore, CacheSize, BatchSize)
+ } else {
+ kvStore
+ }
+
+ store = new NullSafeKeyValueStore(store)
+ }
+
+ @After
+ def teardown() {
+ store.close
+ if (dir != null && dir.listFiles() != null) {
+ for (file <- dir.listFiles)
+ file.delete()
+ dir.delete()
+ }
+ }
+
+ @Test
+ def getNonExistantIsNull() {
+ assertNull(store.get(b("hello")))
+ }
+
+ @Test
+ def putAndGet() {
+ store.put(b("k"), b("v"))
+ assertArrayEquals(b("v"), store.get(b("k")))
+ }
+
+ @Test
+ def doublePutAndGet() {
+ val k = b("k2")
+ store.put(k, b("v1"))
+ store.put(k, b("v2"))
+ store.put(k, b("v3"))
+ assertArrayEquals(b("v3"), store.get(k))
+ }
+
+ @Test
+ def testNullsWithSerde() {
+ if (serde) {
+ val a = b("a")
+ val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
+ val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
+
+ intercept[NullPointerException] { store.get(null) }
+ intercept[NullPointerException] { store.delete(null) }
+ intercept[NullPointerException] { store.put(null, a) }
+ intercept[NullPointerException] { store.put(a, null) }
+ intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
+ intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
+ intercept[NullPointerException] { store.range(a, null) }
+ intercept[NullPointerException] { store.range(null, a) }
+ }
+ }
+
+ @Test
+ def testPutAll() {
+ // Use CacheSize - 1 so we fully fill the cache, but don't write any data
+ // out. Our check (below) uses == for cached entries, and using
+ // numEntires >= CacheSize would result in the LRU cache dropping some
+ // entries. The result would be that we get the correct byte array back
+ // from the cache's underlying store (leveldb), but that == would fail.
+ val numEntries = CacheSize - 1
+ val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
+ store.putAll(entries)
+ if (cache) {
+ assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
+ } else {
+ assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
+ }
+ }
+
+ @Test
+ def testIterateAll() {
+ for (letter <- letters)
+ store.put(b(letter.toString), b(letter.toString))
+ val iter = store.all
+ checkRange(letters, iter)
+ iter.close()
+ }
+
+ @Test
+ def testRange() {
+ val from = 5
+ val to = 20
+ for (letter <- letters)
+ store.put(b(letter.toString), b(letter.toString))
+
+ val iter = store.range(b(letters(from)), b(letters(to)))
+ checkRange(letters.slice(from, to), iter)
+ iter.close()
+ }
+
+ @Test
+ def testDelete() {
+ val a = b("a")
+ assertNull(store.get(a))
+ store.put(a, a)
+ assertArrayEquals(a, store.get(a))
+ store.delete(a)
+ assertNull(store.get(a))
+ }
+
+ @Test
+ def testSimpleScenario() {
+ val vals = letters.map(b(_))
+ for (v <- vals) {
+ assertNull(store.get(v))
+ store.put(v, v)
+ assertArrayEquals(v, store.get(v))
+ }
+ vals.foreach(v => assertArrayEquals(v, store.get(v)))
+ vals.foreach(v => store.delete(v))
+ vals.foreach(v => assertNull(store.get(v)))
+ }
+
+ /**
+ * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
+ * implementation. The issue is that it doesn't work. More specifically,
+ * creating a DoubleLinkedList from an existing list does not update the
+ * "prev" field of the existing list's head to point to the new head. As a
+ * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null.
+ * Samza gets around this by manually updating the field itself. See SAMZA-80
+ * for details.
+ *
+ * This issue is exposed in Samza's KV cache implementation, which uses
+ * DoubleLinkedList, so all comments in this method are discussing the cached
+ * implementation, but the test is still useful as a sanity check for
+ * non-cached stores.
+ */
+ @Test
+ def testBrokenScalaDoubleLinkedList() {
+ val something = b("")
+ val keys = letters
+ .map(b(_))
+ .toArray
+
+ // Load the cache to capacity.
+ letters
+ .slice(0, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.put(_, something))
+
+ // Now keep everything in the cache, but with an empty dirty list.
+ store.flush
+
+ // Dirty list is now empty, and every CacheEntry has dirty=null.
+
+ // Corrupt the dirty list by creating two dirty lists that toggle back and
+ // forth depending on whether the last dirty write was to 1 or 0. The trick
+ // here is that every element in the cache is treated as the "head" of the
+ // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
+ // You can end up with multiple nodes each having their own version of the
+ // dirty list with different elements in them.
+ store.put(keys(1), something)
+ store.put(keys(0), something)
+ store.put(keys(1), something)
+ store.flush
+ // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
+ store.put(keys(0), something)
+ // The dirty list now has 0 and 1, but 1's dirty field is null in the
+ // cache because it was just flushed.
+
+ // Get rid of 1 from the cache by reading every other element, and then
+ // putting one new element.
+ letters
+ .slice(2, TestKeyValueStores.CacheSize)
+ .map(b(_))
+ .foreach(store.get(_))
+ store.put(keys(TestKeyValueStores.CacheSize), something)
+
+ // Now try and trigger an NPE since the dirty list has an element (1)
+ // that's no longer in the cache.
+ store.flush
+ }
+
+ /**
+ * A little test that tries to simulate a few common patterns:
+ * read-modify-write, and do-some-stuff-then-delete (windowing).
+ */
+ @Test
+ def testRandomReadWriteRemove() {
+ // Make test deterministic by seeding the random number generator.
+ val rand = new Random(12345)
+ val keys = letters
+ .map(b(_))
+ .toArray
+
+ // Map from letter to key byte array used for letter, and expected value.
+ // We have to go through some acrobatics here since Java's byte array uses
+ // object identity for .equals. Two byte arrays can have identical byte
+ // elements, but not be equal.
+ var expected = Map[String, (Array[Byte], String)]()
+
+ (0 until 100).foreach(loop => {
+ (0 until 30).foreach(i => {
+ val idx = rand.nextInt(keys.length)
+ val randomValue = letters(rand.nextInt(keys.length))
+ val key = keys(idx)
+ val currentVal = store.get(key)
+ store.put(key, b(randomValue))
+ expected += letters(idx) -> (key, randomValue)
+ })
+
+ for ((k, v) <- expected) {
+ val bytes = store.get(v._1)
+ assertNotNull(bytes)
+ assertEquals(v._2, new String(bytes, "UTF-8"))
+ }
+
+ // Iterating and making structural modifications (deletion) does not look right.
+ // Separating these two steps
+ val iterator = store.all
+ val allKeys = new ArrayBuffer[Array[Byte]]()
+
+ // First iterate
+ while (iterator.hasNext) {
+ allKeys += iterator.next.getKey
+ }
+ iterator.close
+
+ // And now delete
+ for (key <- allKeys) {
+ store.delete(key)
+ expected -= new String(key, "UTF-8")
+ }
+
+ assertEquals(0, expected.size)
+ })
+ }
+
+ def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
+ for (v <- vals) {
+ assertTrue(iter.hasNext)
+ val entry = iter.next()
+ assertEquals(v, s(entry.getKey))
+ assertEquals(v, s(entry.getValue))
+ }
+ assertFalse(iter.hasNext)
+ intercept[NoSuchElementException] { iter.next() }
+ }
+
+ /**
+ * Convert string to byte buffer
+ */
+ def b(s: String) =
+ s.getBytes
+
+ /**
+ * Convert byte buffer to string
+ */
+ def s(b: Array[Byte]) =
+ new String(b)
+}
+
+object TestKeyValueStores {
+ val CacheSize = 10
+ val BatchSize = 5
+ @Parameters
+ def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none"))
+}