You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/02 20:08:20 UTC

samza git commit: SAMZA-439; fix stack overflow in CachedStore

Repository: samza
Updated Branches:
  refs/heads/master c3b469e0a -> f2fd9aaab


SAMZA-439; fix stack overflow in CachedStore


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f2fd9aaa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f2fd9aaa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f2fd9aaa

Branch: refs/heads/master
Commit: f2fd9aaab2dbd2a1508c498a4071f79e29102105
Parents: c3b469e
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Mon Feb 2 11:08:13 2015 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Feb 2 11:08:13 2015 -0800

----------------------------------------------------------------------
 .../org/apache/samza/storage/kv/CachedStore.scala      | 12 +++++++-----
 .../apache/samza/storage/kv/TestKeyValueStores.scala   | 13 ++++++++++---
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1fa96ba..1971b1f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -20,8 +20,8 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
-
 import scala.collection._
+import java.util.Arrays
 
 /**
  * A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold:
@@ -150,13 +150,15 @@ class CachedStore[K, V](
     metrics.flushes.inc
 
     // write out the contents of the dirty list oldest first
-    val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
-    for (k <- this.dirty.reverse) {
+    val batch = new Array[Entry[K, V]](this.dirtyCount)
+    var pos : Int = this.dirtyCount - 1;
+    for (k <- this.dirty) {
       val entry = this.cache.get(k)
       entry.dirty = null // not dirty any more
-      batch.add(new Entry(k, entry.value))
+      batch(pos) = new Entry(k, entry.value)
+      pos -= 1
     }
-    store.putAll(batch)
+    store.putAll(Arrays.asList(batch : _*))
     store.flush
     metrics.flushBatchSize.inc(batch.size)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 2082610..f592d8e 100644
--- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -112,6 +112,13 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   }
 
   @Test
+  def putStessTest() {
+    for( a <- 0 to 1900000){
+        store.put(b(a+"k"), b("v"))
+      }
+  }
+
+  @Test
   def doublePutAndGet() {
     val k = b("k2")
     store.put(k, b("v1"))
@@ -252,7 +259,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
             .slice(2, TestKeyValueStores.CacheSize)
             .map(b(_))
             .foreach(store.get(_))
-    store.put(keys(TestKeyValueStores.CacheSize), something)
+    store.put(keys(10), something)
 
     // Now try and trigger an NPE since the dirty list has an element (1) 
     // that's no longer in the cache.
@@ -339,8 +346,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
 }
 
 object TestKeyValueStores {
-  val CacheSize = 10
-  val BatchSize = 5
+  val CacheSize = 1000000
+  val BatchSize = 1000000
   @Parameters
   def parameters: java.util.Collection[Array[String]] = Arrays.asList(
       //LevelDB