You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/15 03:12:23 UTC
git commit: Use EntryWeigher instead of Weigher to Measuring the
cache patch by Vijay; reviewed by Jonathan Ellis for CASSANDRA-4315
Updated Branches:
refs/heads/trunk 95b833d42 -> 8aaaacd09
Use EntryWeigher instead of Weigher to Measuring the cache
patch by Vijay; reviewed by Jonathan Ellis for CASSANDRA-4315
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8aaaacd0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8aaaacd0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8aaaacd0
Branch: refs/heads/trunk
Commit: 8aaaacd09d8e34c93271383a0c86ecce38ef1965
Parents: 95b833d
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 14 18:10:42 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 14 18:10:42 2012 -0700
----------------------------------------------------------------------
.../cassandra/cache/ConcurrentLinkedHashCache.java | 51 ++++++---------
.../cache/ConcurrentLinkedHashCacheProvider.java | 24 +------
.../apache/cassandra/cache/IRowCacheProvider.java | 4 +-
.../apache/cassandra/cache/SerializingCache.java | 23 ++++---
.../cassandra/cache/SerializingCacheProvider.java | 4 +-
.../org/apache/cassandra/db/RowIndexEntry.java | 8 ++-
.../org/apache/cassandra/service/CacheService.java | 34 ++++++++--
.../apache/cassandra/cache/CacheProviderTest.java | 7 ++-
8 files changed, 78 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
index 8f36b15..375a7d0 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.cache;
import java.util.Set;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
-import com.googlecode.concurrentlinkedhashmap.Weigher;
+import org.github.jamm.MemoryMeter;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
/** Wrapper so CLHM can implement ICache interface.
* (this is what you get for making library classes final.) */
@@ -30,42 +30,20 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
{
public static final int DEFAULT_CONCURENCY_LEVEL = 64;
private final ConcurrentLinkedHashMap<K, V> map;
+ private static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead();
- public ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map)
+ private ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map)
{
this.map = map;
}
/**
- * Initialize a cache with weigher = Weighers.singleton() and initial capacity 0
- *
- * @param capacity cache weighted capacity
- *
- * @param <K> key type
- * @param <V> value type
- *
- * @return initialized cache
- */
- public static <K, V> ConcurrentLinkedHashCache<K, V> create(long capacity)
- {
- return create(capacity, Weighers.<V>singleton());
- }
-
- /**
- * Initialize a cache with initial capacity set to 0
- *
- * @param weightedCapacity cache weighted capacity
- * @param weigher The weigher to use
- *
- * @param <K> key type
- * @param <V> value type
- *
- * @return initialized cache
+ * Initialize a cache with initial capacity with weightedCapacity
*/
- public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, Weigher<V> weigher)
+ public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, EntryWeigher<K, V> entryWeiger)
{
ConcurrentLinkedHashMap<K, V> map = new ConcurrentLinkedHashMap.Builder<K, V>()
- .weigher(weigher)
+ .weigher(entryWeiger)
.maximumWeightedCapacity(weightedCapacity)
.concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
.build();
@@ -73,6 +51,19 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
return new ConcurrentLinkedHashCache<K, V>(map);
}
+ public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity)
+ {
+ return create(weightedCapacity, new EntryWeigher<K, V>()
+ {
+ public int weightOf(K key, V value)
+ {
+ long size = meter.measure(key) + meter.measure(value);
+ assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB/Integer.MAX_VALUE";
+ return (int) size;
+ }
+ });
+ }
+
public long capacity()
{
return map.capacity();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
index b66b8b3..e1e06ee 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
@@ -17,30 +17,10 @@
*/
package org.apache.cassandra.cache;
-import com.googlecode.concurrentlinkedhashmap.Weigher;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
-
-import org.github.jamm.MemoryMeter;
-
public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider
{
- public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher)
- {
- return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher
- ? createMemoryWeigher()
- : Weighers.<IRowCacheEntry>singleton());
- }
-
- private static Weigher<IRowCacheEntry> createMemoryWeigher()
+ public ICache<RowCacheKey, IRowCacheEntry> create(long capacity)
{
- return new Weigher<IRowCacheEntry>()
- {
- final MemoryMeter meter = new MemoryMeter();
-
- public int weightOf(IRowCacheEntry value)
- {
- return (int) Math.min(meter.measure(value), Integer.MAX_VALUE);
- }
- };
+ return ConcurrentLinkedHashCache.create(capacity);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
index 2e3ff99..003bfae 100644
--- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.cache;
-import org.apache.cassandra.db.ColumnFamily;
-
/**
* Provides cache objects with a requested capacity.
*/
public interface IRowCacheProvider
{
- public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher);
+ public ICache<RowCacheKey, IRowCacheEntry> create(long capacity);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 9e36a89..54067e3 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -22,9 +22,9 @@ import java.io.IOException;
import java.util.Set;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import com.googlecode.concurrentlinkedhashmap.Weigher;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
@@ -49,7 +49,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
private final ISerializer<V> serializer;
- public SerializingCache(long capacity, boolean useMemoryWeigher, ISerializer<V> serializer)
+ private SerializingCache(long capacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer)
{
this.serializer = serializer;
@@ -62,24 +62,29 @@ public class SerializingCache<K, V> implements ICache<K, V>
};
this.map = new ConcurrentLinkedHashMap.Builder<K, FreeableMemory>()
- .weigher(useMemoryWeigher
- ? createMemoryWeigher()
- : Weighers.<FreeableMemory>singleton())
+ .weigher(weigher)
.maximumWeightedCapacity(capacity)
.concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
.listener(listener)
.build();
}
- private static Weigher<FreeableMemory> createMemoryWeigher()
+ public static <K, V> SerializingCache<K, V> create(long weightedCapacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer)
{
- return new Weigher<FreeableMemory>()
+ return new SerializingCache<K, V>(weightedCapacity, weigher, serializer);
+ }
+
+ public static <K, V> SerializingCache<K, V> create(long weightedCapacity, ISerializer<V> serializer)
+ {
+ return create(weightedCapacity, new Weigher<FreeableMemory>()
{
public int weightOf(FreeableMemory value)
{
- return (int) Math.min(value.size(), Integer.MAX_VALUE);
+ long size = value.size();
+ assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB";
+ return (int) size;
}
- };
+ }, serializer);
}
private V deserialize(FreeableMemory mem)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index 6adcd89..c8d11d2 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.net.MessagingService;
public class SerializingCacheProvider implements IRowCacheProvider
{
- public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher)
+ public ICache<RowCacheKey, IRowCacheEntry> create(long capacity)
{
- return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher, new RowCacheSerializer());
+ return SerializingCache.create(capacity, new RowCacheSerializer());
}
// Package protected for tests
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 0c50746..b7660e5 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -20,10 +20,8 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -43,6 +41,11 @@ public class RowIndexEntry
this.position = position;
}
+ public int serializedSize()
+ {
+ return TypeSizes.NATIVE.sizeof(position);
+ }
+
public static RowIndexEntry create(long position, DeletionInfo deletionInfo, ColumnIndex index)
{
if (index != null && index.columnsIndex != null && index.columnsIndex.size() > 1)
@@ -184,6 +187,7 @@ public class RowIndexEntry
return bloomFilter;
}
+ @Override
public int serializedSize()
{
TypeSizes typeSizes = TypeSizes.NATIVE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 77f8349..6c7b0a2 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -48,16 +47,18 @@ import org.apache.cassandra.io.sstable.SSTableReader.Operator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
+
public class CacheService implements CacheServiceMBean
{
private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Caches";
- public static final int AVERAGE_KEY_CACHE_ROW_SIZE = 48;
public static enum CacheType
{
@@ -117,7 +118,25 @@ public class CacheService implements CacheServiceMBean
// as values are constant size we can use singleton weigher
// where 48 = 40 bytes (average size of the key) + 8 bytes (size of value)
- ICache<KeyCacheKey, RowIndexEntry> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE);
+ ICache<KeyCacheKey, RowIndexEntry> kc;
+ if (MemoryMeter.isInitialized())
+ {
+ kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity);
+ }
+ else
+ {
+ logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); KeyCache size in JVM Heap will not be calculated accurately. " +
+ "Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
+ /* We don't know the overhead size because memory meter is not enabled. */
+ EntryWeigher<KeyCacheKey, RowIndexEntry> weigher = new EntryWeigher<KeyCacheKey, RowIndexEntry>()
+ {
+ public int weightOf(KeyCacheKey key, RowIndexEntry entry)
+ {
+ return key.key.length + entry.serializedSize();
+ }
+ };
+ kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity, weigher);
+ }
AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
@@ -143,7 +162,7 @@ public class CacheService implements CacheServiceMBean
long rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024;
// cache object
- ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true);
+ ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity);
AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
@@ -245,7 +264,7 @@ public class CacheService implements CacheServiceMBean
public long getKeyCacheCapacityInBytes()
{
- return keyCache.getCapacity() * AVERAGE_KEY_CACHE_ROW_SIZE;
+ return keyCache.getCapacity();
}
public long getKeyCacheCapacityInMB()
@@ -258,7 +277,8 @@ public class CacheService implements CacheServiceMBean
if (capacity < 0)
throw new RuntimeException("capacity should not be negative.");
- keyCache.setCapacity(capacity * 1024 * 1024 / 48);
+ long weightedCapacity = capacity * 1024 * 1024;
+ keyCache.setCapacity(MemoryMeter.isInitialized() ? weightedCapacity : (weightedCapacity / 48));
}
public long getRowCacheSize()
@@ -268,7 +288,7 @@ public class CacheService implements CacheServiceMBean
public long getKeyCacheSize()
{
- return keyCache.weightedSize() * AVERAGE_KEY_CACHE_ROW_SIZE;
+ return keyCache.weightedSize();
}
public void reduceCacheSizes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 46164bf..8b4d17b 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamily;
+
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
import static org.apache.cassandra.Util.column;
import static org.junit.Assert.*;
@@ -105,7 +108,7 @@ public class CacheProviderTest extends SchemaLoader
@Test
public void testHeapCache() throws InterruptedException
{
- ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY);
+ ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY, Weighers.<String, IRowCacheEntry>entrySingleton());
ColumnFamily cf = createCF();
simpleCase(cf, cache);
concurrentCase(cf, cache);
@@ -114,7 +117,7 @@ public class CacheProviderTest extends SchemaLoader
@Test
public void testSerializingCache() throws InterruptedException
{
- ICache<String, IRowCacheEntry> cache = new SerializingCache<String, IRowCacheEntry>(CAPACITY, false, new SerializingCacheProvider.RowCacheSerializer());
+ ICache<String, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<FreeableMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer());
ColumnFamily cf = createCF();
simpleCase(cf, cache);
concurrentCase(cf, cache);