You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/15 03:12:23 UTC

git commit: Use EntryWeigher instead of Weigher to Measuring the cache patch by Vijay; reviewed by Jonathan Ellis for CASSANDRA-4315

Updated Branches:
  refs/heads/trunk 95b833d42 -> 8aaaacd09


Use EntryWeigher instead of Weigher to Measuring the cache
patch by Vijay; reviewed by Jonathan Ellis for CASSANDRA-4315


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8aaaacd0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8aaaacd0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8aaaacd0

Branch: refs/heads/trunk
Commit: 8aaaacd09d8e34c93271383a0c86ecce38ef1965
Parents: 95b833d
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 14 18:10:42 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 14 18:10:42 2012 -0700

----------------------------------------------------------------------
 .../cassandra/cache/ConcurrentLinkedHashCache.java |   51 ++++++---------
 .../cache/ConcurrentLinkedHashCacheProvider.java   |   24 +------
 .../apache/cassandra/cache/IRowCacheProvider.java  |    4 +-
 .../apache/cassandra/cache/SerializingCache.java   |   23 ++++---
 .../cassandra/cache/SerializingCacheProvider.java  |    4 +-
 .../org/apache/cassandra/db/RowIndexEntry.java     |    8 ++-
 .../org/apache/cassandra/service/CacheService.java |   34 ++++++++--
 .../apache/cassandra/cache/CacheProviderTest.java  |    7 ++-
 8 files changed, 78 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
index 8f36b15..375a7d0 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.cache;
 
 import java.util.Set;
 
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
-import com.googlecode.concurrentlinkedhashmap.Weigher;
+import org.github.jamm.MemoryMeter;
 
-import com.googlecode.concurrentlinkedhashmap.Weighers;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
 
 /** Wrapper so CLHM can implement ICache interface.
  *  (this is what you get for making library classes final.) */
@@ -30,42 +30,20 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
 {
     public static final int DEFAULT_CONCURENCY_LEVEL = 64;
     private final ConcurrentLinkedHashMap<K, V> map;
+    private static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead();
 
-    public ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map)
+    private ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map)
     {
         this.map = map;
     }
 
     /**
-     * Initialize a cache with weigher = Weighers.singleton() and initial capacity 0
-     *
-     * @param capacity cache weighted capacity
-     *
-     * @param <K> key type
-     * @param <V> value type
-     *
-     * @return initialized cache
-     */
-    public static <K, V> ConcurrentLinkedHashCache<K, V> create(long capacity)
-    {
-        return create(capacity, Weighers.<V>singleton());
-    }
-
-    /**
-     * Initialize a cache with initial capacity set to 0
-     *
-     * @param weightedCapacity cache weighted capacity
-     * @param weigher The weigher to use
-     *
-     * @param <K> key type
-     * @param <V> value type
-     *
-     * @return initialized cache
+     * Initialize a cache with initial capacity with weightedCapacity
      */
-    public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, Weigher<V> weigher)
+    public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, EntryWeigher<K, V> entryWeiger)
     {
         ConcurrentLinkedHashMap<K, V> map = new ConcurrentLinkedHashMap.Builder<K, V>()
-                                            .weigher(weigher)
+                                            .weigher(entryWeiger)
                                             .maximumWeightedCapacity(weightedCapacity)
                                             .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
                                             .build();
@@ -73,6 +51,19 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
         return new ConcurrentLinkedHashCache<K, V>(map);
     }
 
+    public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity)
+    {
+        return create(weightedCapacity, new EntryWeigher<K, V>()
+        {
+            public int weightOf(K key, V value)
+            {
+                long size = meter.measure(key) + meter.measure(value);
+                assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB/Integer.MAX_VALUE";
+                return (int) size;
+            }
+        });
+    }
+
     public long capacity()
     {
         return map.capacity();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
index b66b8b3..e1e06ee 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
@@ -17,30 +17,10 @@
  */
 package org.apache.cassandra.cache;
 
-import com.googlecode.concurrentlinkedhashmap.Weigher;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
-
-import org.github.jamm.MemoryMeter;
-
 public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider
 {
-    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher)
-    {
-        return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher
-                                                            ? createMemoryWeigher()
-                                                            : Weighers.<IRowCacheEntry>singleton());
-    }
-
-    private static Weigher<IRowCacheEntry> createMemoryWeigher()
+    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity)
     {
-        return new Weigher<IRowCacheEntry>()
-        {
-            final MemoryMeter meter = new MemoryMeter();
-
-            public int weightOf(IRowCacheEntry value)
-            {
-                return (int) Math.min(meter.measure(value), Integer.MAX_VALUE);
-            }
-        };
+        return ConcurrentLinkedHashCache.create(capacity);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
index 2e3ff99..003bfae 100644
--- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.cache;
 
-import org.apache.cassandra.db.ColumnFamily;
-
 /**
  * Provides cache objects with a requested capacity.
  */
 public interface IRowCacheProvider
 {
-    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher);
+    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 9e36a89..54067e3 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 import java.util.Set;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
 import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 import com.googlecode.concurrentlinkedhashmap.Weigher;
-import com.googlecode.concurrentlinkedhashmap.Weighers;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
@@ -49,7 +49,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
     private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
     private final ISerializer<V> serializer;
 
-    public SerializingCache(long capacity, boolean useMemoryWeigher, ISerializer<V> serializer)
+    private SerializingCache(long capacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer)
     {
         this.serializer = serializer;
 
@@ -62,24 +62,29 @@ public class SerializingCache<K, V> implements ICache<K, V>
         };
 
         this.map = new ConcurrentLinkedHashMap.Builder<K, FreeableMemory>()
-                   .weigher(useMemoryWeigher
-                                ? createMemoryWeigher()
-                                : Weighers.<FreeableMemory>singleton())
+                   .weigher(weigher)
                    .maximumWeightedCapacity(capacity)
                    .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
                    .listener(listener)
                    .build();
     }
 
-    private static Weigher<FreeableMemory> createMemoryWeigher()
+    public static <K, V> SerializingCache<K, V> create(long weightedCapacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer)
     {
-        return new Weigher<FreeableMemory>()
+        return new SerializingCache<K, V>(weightedCapacity, weigher, serializer);
+    }
+
+    public static <K, V> SerializingCache<K, V> create(long weightedCapacity, ISerializer<V> serializer)
+    {
+        return create(weightedCapacity, new Weigher<FreeableMemory>()
         {
             public int weightOf(FreeableMemory value)
             {
-                return (int) Math.min(value.size(), Integer.MAX_VALUE);
+                long size = value.size();
+                assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB";
+                return (int) size;
             }
-        };
+        }, serializer);
     }
 
     private V deserialize(FreeableMemory mem)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index 6adcd89..c8d11d2 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.net.MessagingService;
 
 public class SerializingCacheProvider implements IRowCacheProvider
 {
-    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher)
+    public ICache<RowCacheKey, IRowCacheEntry> create(long capacity)
     {
-        return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher, new RowCacheSerializer());
+        return SerializingCache.create(capacity, new RowCacheSerializer());
     }
 
     // Package protected for tests

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 0c50746..b7660e5 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -20,10 +20,8 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -43,6 +41,11 @@ public class RowIndexEntry
         this.position = position;
     }
 
+    public int serializedSize()
+    {
+        return TypeSizes.NATIVE.sizeof(position);
+    }
+
     public static RowIndexEntry create(long position, DeletionInfo deletionInfo, ColumnIndex index)
     {
         if (index != null && index.columnsIndex != null && index.columnsIndex.size() > 1)
@@ -184,6 +187,7 @@ public class RowIndexEntry
             return bloomFilter;
         }
 
+        @Override
         public int serializedSize()
         {
             TypeSizes typeSizes = TypeSizes.NATIVE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 77f8349..6c7b0a2 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -48,16 +47,18 @@ import org.apache.cassandra.io.sstable.SSTableReader.Operator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.github.jamm.MemoryMeter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
+
 public class CacheService implements CacheServiceMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
 
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Caches";
-    public static final int AVERAGE_KEY_CACHE_ROW_SIZE = 48;
 
     public static enum CacheType
     {
@@ -117,7 +118,25 @@ public class CacheService implements CacheServiceMBean
 
         // as values are constant size we can use singleton weigher
         // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value)
-        ICache<KeyCacheKey, RowIndexEntry> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE);
+        ICache<KeyCacheKey, RowIndexEntry> kc;
+        if (MemoryMeter.isInitialized())
+        {
+            kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity);
+        }
+        else
+        {
+            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); KeyCache size in JVM Heap will not be calculated accurately. " +
+            		"Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
+            /* We don't know the overhead size because memory meter is not enabled. */
+            EntryWeigher<KeyCacheKey, RowIndexEntry> weigher = new EntryWeigher<KeyCacheKey, RowIndexEntry>()
+            {
+                public int weightOf(KeyCacheKey key, RowIndexEntry entry)
+                {
+                    return key.key.length + entry.serializedSize();
+                }
+            };
+            kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity, weigher);
+        }
         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
@@ -143,7 +162,7 @@ public class CacheService implements CacheServiceMBean
         long rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024;
 
         // cache object
-        ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true);
+        ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity);
         AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
 
         int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
@@ -245,7 +264,7 @@ public class CacheService implements CacheServiceMBean
 
     public long getKeyCacheCapacityInBytes()
     {
-        return keyCache.getCapacity() * AVERAGE_KEY_CACHE_ROW_SIZE;
+        return keyCache.getCapacity();
     }
 
     public long getKeyCacheCapacityInMB()
@@ -258,7 +277,8 @@ public class CacheService implements CacheServiceMBean
         if (capacity < 0)
             throw new RuntimeException("capacity should not be negative.");
 
-        keyCache.setCapacity(capacity * 1024 * 1024 / 48);
+        long weightedCapacity = capacity * 1024 * 1024;
+        keyCache.setCapacity(MemoryMeter.isInitialized() ? weightedCapacity : (weightedCapacity / 48));
     }
 
     public long getRowCacheSize()
@@ -268,7 +288,7 @@ public class CacheService implements CacheServiceMBean
 
     public long getKeyCacheSize()
     {
-        return keyCache.weightedSize() * AVERAGE_KEY_CACHE_ROW_SIZE;
+        return keyCache.weightedSize();
     }
 
     public void reduceCacheSizes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 46164bf..8b4d17b 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamily;
+
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
 import static org.apache.cassandra.Util.column;
 import static org.junit.Assert.*;
 
@@ -105,7 +108,7 @@ public class CacheProviderTest extends SchemaLoader
     @Test
     public void testHeapCache() throws InterruptedException
     {
-        ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY);
+        ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY, Weighers.<String, IRowCacheEntry>entrySingleton());
         ColumnFamily cf = createCF();
         simpleCase(cf, cache);
         concurrentCase(cf, cache);
@@ -114,7 +117,7 @@ public class CacheProviderTest extends SchemaLoader
     @Test
     public void testSerializingCache() throws InterruptedException
     {
-        ICache<String, IRowCacheEntry> cache = new SerializingCache<String, IRowCacheEntry>(CAPACITY, false, new SerializingCacheProvider.RowCacheSerializer());
+        ICache<String, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<FreeableMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer());
         ColumnFamily cf = createCF();
         simpleCase(cf, cache);
         concurrentCase(cf, cache);