You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/06 20:15:48 UTC

svn commit: r1089555 - in /cassandra/trunk: src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassan...

Author: jbellis
Date: Wed Apr  6 18:15:47 2011
New Revision: 1089555

URL: http://svn.apache.org/viewvc?rev=1089555&view=rev
Log:
add SerializingCacheProvider
patch by Vijay and jbellis for CASSANDRA-1969

Added:
    cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Wed Apr  6 18:15:47 2011
@@ -30,11 +30,6 @@ public class ConcurrentLinkedHashCache<K
         return new ConcurrentLinkedHashCache<K, V>(map);
     }
 
-    public void discard(K key)
-    {
-        remove(key);
-    }
-
     public int capacity()
     {
         return map.capacity();
@@ -79,4 +74,9 @@ public class ConcurrentLinkedHashCache<K
     {
         return map.keySet();
     }
+
+    public boolean isPutCopying()
+    {
+        return false;
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,37 @@
+package org.apache.cassandra.cache;
+
+import java.io.IOException;
+
+import com.sun.jna.Memory;
+
+public class FreeableMemory extends Memory
+{
+	protected volatile boolean valid = true;
+	
+    public FreeableMemory(long size)
+    {
+        super(size);
+    }
+
+    public void free()
+    {
+        assert peer != 0;
+        super.finalize(); // calls free and sets peer to zero
+    }
+
+    /**
+     * avoid re-freeing already-freed memory
+     */
+    @Override
+    protected void finalize()
+    {
+        if (peer != 0)
+            super.finalize();
+    }
+    
+    public byte getValidByte(long offset)
+    {
+        assert peer != 0;
+        return super.getByte(offset);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Wed Apr  6 18:15:47 2011
@@ -24,4 +24,10 @@ public interface ICache<K, V>
     public void clear();
 
     public Set<K> keySet();
+
+    /**
+     * @return true if the cache implementation inherently copies the cached values; otherwise,
+     * the caller should copy manually before caching shared values like Thrift ByteBuffers.
+     */
+    public boolean isPutCopying();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Wed Apr  6 18:15:47 2011
@@ -144,4 +144,9 @@ public class InstrumentingCache<K, V> im
     {
         return map.keySet();
     }
+
+    public boolean isPutCopying()
+    {
+        return map.isPutCopying();
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,149 @@
+package org.apache.cassandra.cache;
+
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Set;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
+import org.apache.cassandra.io.ICompactSerializer3;
+import org.apache.cassandra.io.util.MemoryInputStream;
+import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Serializes cache values off-heap.
+ */
+public class SerializingCache<K, V> implements ICache<K, V>
+{
+    private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
+    private static final int DEFAULT_CONCURENCY_LEVEL = 64;
+    
+    private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
+    private final ICompactSerializer3<V> serializer;
+
+    public SerializingCache(int capacity, ICompactSerializer3<V> serializer)
+    {
+        this.serializer = serializer;
+
+        EvictionListener<K,FreeableMemory> listener = new EvictionListener<K, FreeableMemory>()
+        {
+            public void onEviction(K k, FreeableMemory mem)
+            {
+                mem.free();
+            }
+        };
+        this.map = new ConcurrentLinkedHashMap.Builder<K, FreeableMemory>()
+                   .weigher(Weighers.<FreeableMemory>singleton())
+                   .initialCapacity(capacity)
+                   .maximumWeightedCapacity(capacity)
+                   .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
+                   .listener(listener)
+                   .build();
+    }
+
+	private V deserialize(FreeableMemory mem)
+    {
+        try
+        {
+            return serializer.deserialize(new MemoryInputStream(mem));
+        }
+        catch (IOException e)
+        {
+            logger.debug("Cannot fetch in memory data, we will failback to read from disk ", e);
+            return null;
+        }
+    }
+
+    private FreeableMemory serialize(V value)
+    {
+        long serializedSize = serializer.serializedSize(value);
+        if (serializedSize > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes");
+
+        FreeableMemory freeableMemory;
+        try
+        {
+            freeableMemory = new FreeableMemory(serializedSize);
+        }
+        catch (OutOfMemoryError e)
+        {
+            return null;
+        }
+
+        try
+        {
+            serializer.serialize(value, new DataOutputStream(new MemoryOutputStream(freeableMemory)));
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        return freeableMemory;
+    }
+
+    public int capacity()
+    {
+        return map.capacity();
+    }
+
+    public void setCapacity(int capacity)
+    {
+        map.setCapacity(capacity);
+    }
+
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    public int size()
+    {
+        return map.size();
+    }
+
+    public void clear()
+    {
+        map.clear();
+    }
+
+    public V get(Object key)
+    {
+        FreeableMemory mem = map.get(key);
+        if (mem == null)
+            return null;
+        return deserialize(mem);
+    }
+
+    public void put(K key, V value)
+    {
+        FreeableMemory mem = serialize(value);
+        if (mem == null)
+            return; // out of memory.  never mind.
+
+        FreeableMemory old = map.put(key, mem);
+        if (old != null)
+            old.free();
+    }
+
+    public void remove(K key)
+    {
+        FreeableMemory mem = map.remove(key);
+        if (mem != null)
+            mem.free();
+    }
+
+    public Set<K> keySet()
+    {
+        return map.keySet();
+    }
+
+    public boolean isPutCopying()
+    {
+        return true;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,27 @@
+package org.apache.cassandra.cache;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+
+import com.sun.jna.Memory;
+
+public class SerializingCacheProvider implements IRowCacheProvider
+{
+    public SerializingCacheProvider() throws ConfigurationException
+    {
+        try
+        {
+            Memory.class.getName();
+        }
+        catch (NoClassDefFoundError e)
+        {
+            throw new ConfigurationException("Cannot intialize SerializationCache without JNA in the class path");
+        }
+    }
+
+    public ICache<DecoratedKey, ColumnFamily> create(int capacity)
+    {
+        return new SerializingCache<DecoratedKey, ColumnFamily>(capacity, ColumnFamily.serializer());
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Apr  6 18:15:47 2011
@@ -223,7 +223,14 @@ public final class CFMetaData
         memtableThroughputInMb       = DEFAULT_MEMTABLE_THROUGHPUT_IN_MB;
         memtableOperationsInMillions = DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS;
         mergeShardsChance            = DEFAULT_MERGE_SHARDS_CHANCE;
-        rowCacheProvider             = FBUtilities.newCacheProvider(DEFAULT_ROW_CACHE_PROVIDER);
+        try
+        {
+            rowCacheProvider             = FBUtilities.newCacheProvider(DEFAULT_ROW_CACHE_PROVIDER);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new AssertionError(e); // the default provider should not error out
+        }
 
         // Defaults strange or simple enough to not need a DEFAULT_T for
         defaultValidator = BytesType.instance;
@@ -381,7 +388,17 @@ public final class CFMetaData
         if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); }
         if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); }
         if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); }
-        if (cf.row_cache_provider != null) { newCFMD.rowCacheProvider(FBUtilities.newCacheProvider(cf.row_cache_provider.toString())); }
+        if (cf.row_cache_provider != null)
+        {
+            try
+            {
+                newCFMD.rowCacheProvider(FBUtilities.newCacheProvider(cf.row_cache_provider.toString()));
+            }
+            catch (ConfigurationException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
 
         return newCFMD.comment(cf.comment.toString())
                       .rowCacheSize(cf.row_cache_size)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Apr  6 18:15:47 2011
@@ -31,8 +31,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.ICompactSerializer3;
 
-public class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+public class ColumnFamilySerializer implements ICompactSerializer3<ColumnFamily>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilySerializer.class);
 
@@ -142,4 +143,9 @@ public class ColumnFamilySerializer impl
         cf.delete(input.readInt(), input.readLong());
         return cf;
     }
+
+    public long serializedSize(ColumnFamily cf)
+    {
+        return cf.serializedSize();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Apr  6 18:15:47 2011
@@ -729,11 +729,18 @@ public class ColumnFamilyStore implement
         Memtable mt = getMemtableThreadSafe();
         boolean flushRequested = mt.isThresholdViolated();
         mt.put(key, columnFamily);
-        ColumnFamily cachedRow = getRawCachedRow(key);
-        if (cachedRow != null)
-            cachedRow.addAll(columnFamily);
-        writeStats.addNano(System.nanoTime() - start);
-        
+        if (rowCache.isPutCopying())
+        {
+            invalidateCachedRow(key);
+        }
+        else
+        {
+            ColumnFamily cachedRow = getRawCachedRow(key);
+            if (cachedRow != null)
+                cachedRow.addAll(columnFamily);
+            writeStats.addNano(System.nanoTime() - start);
+        }
+
         return flushRequested ? mt : null;
     }
 
@@ -1087,12 +1094,15 @@ public class ColumnFamilyStore implement
             if (cached == null)
                 return null;
 
-            // make a deep copy of column data so we don't keep references to direct buffers, which
-            // would prevent munmap post-compaction.
-            for (IColumn column : cached.getSortedColumns())
+            if (!rowCache.isPutCopying())
             {
-                cached.remove(column.name());
-                cached.addColumn(column.localCopy(this));
+                // make a deep copy of column data so we don't keep references to direct buffers, which
+                // would prevent munmap post-compaction.
+                for (IColumn column : cached.getSortedColumns())
+                {
+                    cached.remove(column.name());
+                    cached.addColumn(column.localCopy(this));
+                }
             }
 
             // avoid keeping a permanent reference to the original key buffer

Added: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.io;
+
+public interface ICompactSerializer3<T> extends ICompactSerializer2<T>
+{
+    public long serializedSize(T t);
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,45 @@
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.cache.FreeableMemory;
+
+import com.sun.xml.internal.ws.Closeable;
+
+public class MemoryInputStream extends AbstractDataInput implements DataInput, Closeable
+{
+    private final FreeableMemory mem;
+    private int position = 0;
+    
+    public MemoryInputStream(FreeableMemory mem)
+    {
+        this.mem = mem;
+    }
+    
+    public int read() throws IOException
+    {       
+        return mem.getValidByte(position++) & 0xFF;
+    }
+    
+    protected void seekInternal(int pos)
+    {
+        position = pos;
+    }
+    
+    protected int getPosition()
+    {
+        return position;
+    }
+    
+    public int skipBytes(int n) throws IOException
+    {
+        seekInternal(getPosition() + n);
+        return position;
+    }
+    
+    public void close()
+    {
+        // do nothing.
+    }
+}
\ No newline at end of file

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java Wed Apr  6 18:15:47 2011
@@ -0,0 +1,33 @@
+package org.apache.cassandra.io.util;
+
+import java.io.OutputStream;
+
+import com.sun.jna.Memory;
+
+/**
+ * This class provides a way to stream the writes into the {@link Memory}
+ */
+public class MemoryOutputStream extends OutputStream
+{
+    
+    private final Memory mem;
+    private int position = 0;
+    
+    public MemoryOutputStream(Memory mem)
+    {
+        this.mem = mem;
+    }
+    
+    @Override
+    public void write(int b)
+    {
+        mem.setByte(this.position, (byte)b);
+        this.position++;
+    }
+    
+    public int position()
+    {
+        return this.position;
+    }
+    
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Apr  6 18:15:47 2011
@@ -655,18 +655,11 @@ public class FBUtilities
         return field;
     }
 
-    public static IRowCacheProvider newCacheProvider(String cache_provider)
+    public static IRowCacheProvider newCacheProvider(String cache_provider) throws ConfigurationException
     {
         if (!cache_provider.contains("."))
             cache_provider = "org.apache.cassandra.cache." + cache_provider;
-        try
-        {
-            return FBUtilities.construct(cache_provider, "row cache provider");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return FBUtilities.construct(cache_provider, "row cache provider");
     }
 
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java Wed Apr  6 18:15:47 2011
@@ -85,4 +85,13 @@ public class CacheProviderTest extends S
         simpleCase(cf, cache);
         concurrentCase(cf, cache);
     }
+
+    @Test
+    public void testSerializingCache() throws InterruptedException
+    {
+        ICache<String, ColumnFamily> cache = new SerializingCache<String, ColumnFamily>(CAPACITY, ColumnFamily.serializer());
+        ColumnFamily cf = createCF();
+        simpleCase(cf, cache);
+        // concurrentCase(cf, cache);
+    }
 }