You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:36 UTC

[20/50] [abbrv] samza git commit: SAMZA-812: CachedStore flush too often

SAMZA-812: CachedStore flush too often


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c7e2eb0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c7e2eb0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c7e2eb0

Branch: refs/heads/samza-sql
Commit: 8c7e2eb04c783e1f1cb7234c12b39e29cf68c903
Parents: 22a1d6f
Author: Tommy Becker <tw...@gmail.com>
Authored: Wed Nov 11 09:01:08 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Nov 11 09:01:08 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/storage/kv/CachedStore.scala   | 18 ++++++++-----
 .../samza/storage/kv/TestCachedStore.scala      | 27 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1112350..9a5b2d5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -62,14 +62,18 @@ class CachedStore[K, V](
   /** an lru cache of values that holds cacheEntries and calls flush() if necessary when discarding */
   private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, V]]((cacheSize * 1.2).toInt, 1.0f, true) {
     override def removeEldestEntry(eldest: java.util.Map.Entry[K, CacheEntry[K, V]]): Boolean = {
-      val entry = eldest.getValue
-      // if this entry hasn't been written out yet, flush it and all other dirty keys
-      if (entry.dirty != null) {
-        debug("Found a dirty entry. Flushing.")
-
-        flush()
+      val evict = super.size > cacheSize
+      // We need backwards compatibility with the previous broken flushing behavior for array keys.
+      if (evict || hasArrayKeys) {
+        val entry = eldest.getValue
+        // if this entry hasn't been written out yet, flush it and all other dirty keys
+        if (entry.dirty != null) {
+          debug("Found a dirty entry. Flushing.")
+
+          flush()
+        }
       }
-      super.size > cacheSize
+      evict
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
index cc9c9f3..198720c 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
@@ -33,6 +33,8 @@ class TestCachedStore {
 
     assertFalse(store.hasArrayKeys)
     store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8"))
+    // Ensure we preserve old, broken flushing behavior for array keys
+    verify(kv).flush();
     assertTrue(store.hasArrayKeys)
   }
 
@@ -88,4 +90,27 @@ class TestCachedStore {
     assertNull(kv.get(keys.get(1)))
     assertNull(store.get(keys.get(1)))
   }
-}
\ No newline at end of file
+
+  @Test
+  def testFlushing() {
+    val kv = mock(classOf[KeyValueStore[String, String]])
+    val store = new CachedStore[String, String](kv, 4, 4)
+
+    val keys = Arrays.asList("test1-key",
+      "test2-key",
+      "test3-key",
+      "test4-key")
+    val values = Arrays.asList("test1-value",
+      "test2-value",
+      "test3-value",
+      "test4-value")
+
+    for (i <- 0 until 3) {
+      store.put(keys.get(i), values.get(i))
+    }
+
+    verify(kv, never()).flush()
+    store.put(keys.get(3), values.get(3));
+    verify(kv).flush();
+  }
+}