You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/02 20:08:20 UTC
samza git commit: SAMZA-439; fix stack overflow in CachedStore
Repository: samza
Updated Branches:
refs/heads/master c3b469e0a -> f2fd9aaab
SAMZA-439; fix stack overflow in CachedStore
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f2fd9aaa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f2fd9aaa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f2fd9aaa
Branch: refs/heads/master
Commit: f2fd9aaab2dbd2a1508c498a4071f79e29102105
Parents: c3b469e
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Mon Feb 2 11:08:13 2015 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Feb 2 11:08:13 2015 -0800
----------------------------------------------------------------------
.../org/apache/samza/storage/kv/CachedStore.scala | 12 +++++++-----
.../apache/samza/storage/kv/TestKeyValueStores.scala | 13 ++++++++++---
2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1fa96ba..1971b1f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -20,8 +20,8 @@
package org.apache.samza.storage.kv
import org.apache.samza.util.Logging
-
import scala.collection._
+import java.util.Arrays
/**
* A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold:
@@ -150,13 +150,15 @@ class CachedStore[K, V](
metrics.flushes.inc
// write out the contents of the dirty list oldest first
- val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
- for (k <- this.dirty.reverse) {
+ val batch = new Array[Entry[K, V]](this.dirtyCount)
+ var pos : Int = this.dirtyCount - 1;
+ for (k <- this.dirty) {
val entry = this.cache.get(k)
entry.dirty = null // not dirty any more
- batch.add(new Entry(k, entry.value))
+ batch(pos) = new Entry(k, entry.value)
+ pos -= 1
}
- store.putAll(batch)
+ store.putAll(Arrays.asList(batch : _*))
store.flush
metrics.flushBatchSize.inc(batch.size)
http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 2082610..f592d8e 100644
--- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -112,6 +112,13 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
}
@Test
+ def putStessTest() {
+ for( a <- 0 to 1900000){
+ store.put(b(a+"k"), b("v"))
+ }
+ }
+
+ @Test
def doublePutAndGet() {
val k = b("k2")
store.put(k, b("v1"))
@@ -252,7 +259,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
.slice(2, TestKeyValueStores.CacheSize)
.map(b(_))
.foreach(store.get(_))
- store.put(keys(TestKeyValueStores.CacheSize), something)
+ store.put(keys(10), something)
// Now try and trigger an NPE since the dirty list has an element (1)
// that's no longer in the cache.
@@ -339,8 +346,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
}
object TestKeyValueStores {
- val CacheSize = 10
- val BatchSize = 5
+ val CacheSize = 1000000
+ val BatchSize = 1000000
@Parameters
def parameters: java.util.Collection[Array[String]] = Arrays.asList(
//LevelDB