You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:36 UTC
[20/50] [abbrv] samza git commit: SAMZA-812: CachedStore flush too
often
SAMZA-812: CachedStore flush too often
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c7e2eb0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c7e2eb0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c7e2eb0
Branch: refs/heads/samza-sql
Commit: 8c7e2eb04c783e1f1cb7234c12b39e29cf68c903
Parents: 22a1d6f
Author: Tommy Becker <tw...@gmail.com>
Authored: Wed Nov 11 09:01:08 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Nov 11 09:01:08 2015 -0800
----------------------------------------------------------------------
.../apache/samza/storage/kv/CachedStore.scala | 18 ++++++++-----
.../samza/storage/kv/TestCachedStore.scala | 27 +++++++++++++++++++-
2 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1112350..9a5b2d5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -62,14 +62,18 @@ class CachedStore[K, V](
/** an lru cache of values that holds cacheEntries and calls flush() if necessary when discarding */
private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, V]]((cacheSize * 1.2).toInt, 1.0f, true) {
override def removeEldestEntry(eldest: java.util.Map.Entry[K, CacheEntry[K, V]]): Boolean = {
- val entry = eldest.getValue
- // if this entry hasn't been written out yet, flush it and all other dirty keys
- if (entry.dirty != null) {
- debug("Found a dirty entry. Flushing.")
-
- flush()
+ val evict = super.size > cacheSize
+ // We need backwards compatibility with the previous broken flushing behavior for array keys.
+ if (evict || hasArrayKeys) {
+ val entry = eldest.getValue
+ // if this entry hasn't been written out yet, flush it and all other dirty keys
+ if (entry.dirty != null) {
+ debug("Found a dirty entry. Flushing.")
+
+ flush()
+ }
}
- super.size > cacheSize
+ evict
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
index cc9c9f3..198720c 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
@@ -33,6 +33,8 @@ class TestCachedStore {
assertFalse(store.hasArrayKeys)
store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8"))
+ // Ensure we preserve old, broken flushing behavior for array keys
+ verify(kv).flush();
assertTrue(store.hasArrayKeys)
}
@@ -88,4 +90,27 @@ class TestCachedStore {
assertNull(kv.get(keys.get(1)))
assertNull(store.get(keys.get(1)))
}
-}
\ No newline at end of file
+
+ @Test
+ def testFlushing() {
+ val kv = mock(classOf[KeyValueStore[String, String]])
+ val store = new CachedStore[String, String](kv, 4, 4)
+
+ val keys = Arrays.asList("test1-key",
+ "test2-key",
+ "test3-key",
+ "test4-key")
+ val values = Arrays.asList("test1-value",
+ "test2-value",
+ "test3-value",
+ "test4-value")
+
+ for (i <- 0 until 3) {
+ store.put(keys.get(i), values.get(i))
+ }
+
+ verify(kv, never()).flush()
+ store.put(keys.get(3), values.get(3));
+ verify(kv).flush();
+ }
+}