You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/06 20:15:48 UTC
svn commit: r1089555 - in /cassandra/trunk:
src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassan...
Author: jbellis
Date: Wed Apr 6 18:15:47 2011
New Revision: 1089555
URL: http://svn.apache.org/viewvc?rev=1089555&view=rev
Log:
add SerializingCacheProvider
patch by Vijay and jbellis for CASSANDRA-1969
Added:
cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Wed Apr 6 18:15:47 2011
@@ -30,11 +30,6 @@ public class ConcurrentLinkedHashCache<K
return new ConcurrentLinkedHashCache<K, V>(map);
}
- public void discard(K key)
- {
- remove(key);
- }
-
public int capacity()
{
return map.capacity();
@@ -79,4 +74,9 @@ public class ConcurrentLinkedHashCache<K
{
return map.keySet();
}
+
+ public boolean isPutCopying()
+ {
+ return false;
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,37 @@
+package org.apache.cassandra.cache;
+
+import java.io.IOException;
+
+import com.sun.jna.Memory;
+
+public class FreeableMemory extends Memory
+{
+ protected volatile boolean valid = true;
+
+ public FreeableMemory(long size)
+ {
+ super(size);
+ }
+
+ public void free()
+ {
+ assert peer != 0;
+ super.finalize(); // calls free and sets peer to zero
+ }
+
+ /**
+ * avoid re-freeing already-freed memory
+ */
+ @Override
+ protected void finalize()
+ {
+ if (peer != 0)
+ super.finalize();
+ }
+
+ public byte getValidByte(long offset)
+ {
+ assert peer != 0;
+ return super.getByte(offset);
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Wed Apr 6 18:15:47 2011
@@ -24,4 +24,10 @@ public interface ICache<K, V>
public void clear();
public Set<K> keySet();
+
+ /**
+ * @return true if the cache implementation inherently copies the cached values; otherwise,
+ * the caller should copy manually before caching shared values like Thrift ByteBuffers.
+ */
+ public boolean isPutCopying();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Wed Apr 6 18:15:47 2011
@@ -144,4 +144,9 @@ public class InstrumentingCache<K, V> im
{
return map.keySet();
}
+
+ public boolean isPutCopying()
+ {
+ return map.isPutCopying();
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,149 @@
+package org.apache.cassandra.cache;
+
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Set;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.googlecode.concurrentlinkedhashmap.Weighers;
+
+import org.apache.cassandra.io.ICompactSerializer3;
+import org.apache.cassandra.io.util.MemoryInputStream;
+import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Serializes cache values off-heap.
+ */
+public class SerializingCache<K, V> implements ICache<K, V>
+{
+ private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
+ private static final int DEFAULT_CONCURENCY_LEVEL = 64;
+
+ private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
+ private final ICompactSerializer3<V> serializer;
+
+ public SerializingCache(int capacity, ICompactSerializer3<V> serializer)
+ {
+ this.serializer = serializer;
+
+ EvictionListener<K,FreeableMemory> listener = new EvictionListener<K, FreeableMemory>()
+ {
+ public void onEviction(K k, FreeableMemory mem)
+ {
+ mem.free();
+ }
+ };
+ this.map = new ConcurrentLinkedHashMap.Builder<K, FreeableMemory>()
+ .weigher(Weighers.<FreeableMemory>singleton())
+ .initialCapacity(capacity)
+ .maximumWeightedCapacity(capacity)
+ .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL)
+ .listener(listener)
+ .build();
+ }
+
+ private V deserialize(FreeableMemory mem)
+ {
+ try
+ {
+ return serializer.deserialize(new MemoryInputStream(mem));
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot fetch in memory data, we will failback to read from disk ", e);
+ return null;
+ }
+ }
+
+ private FreeableMemory serialize(V value)
+ {
+ long serializedSize = serializer.serializedSize(value);
+ if (serializedSize > Integer.MAX_VALUE)
+ throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes");
+
+ FreeableMemory freeableMemory;
+ try
+ {
+ freeableMemory = new FreeableMemory(serializedSize);
+ }
+ catch (OutOfMemoryError e)
+ {
+ return null;
+ }
+
+ try
+ {
+ serializer.serialize(value, new DataOutputStream(new MemoryOutputStream(freeableMemory)));
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return freeableMemory;
+ }
+
+ public int capacity()
+ {
+ return map.capacity();
+ }
+
+ public void setCapacity(int capacity)
+ {
+ map.setCapacity(capacity);
+ }
+
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ public int size()
+ {
+ return map.size();
+ }
+
+ public void clear()
+ {
+ map.clear();
+ }
+
+ public V get(Object key)
+ {
+ FreeableMemory mem = map.get(key);
+ if (mem == null)
+ return null;
+ return deserialize(mem);
+ }
+
+ public void put(K key, V value)
+ {
+ FreeableMemory mem = serialize(value);
+ if (mem == null)
+ return; // out of memory. never mind.
+
+ FreeableMemory old = map.put(key, mem);
+ if (old != null)
+ old.free();
+ }
+
+ public void remove(K key)
+ {
+ FreeableMemory mem = map.remove(key);
+ if (mem != null)
+ mem.free();
+ }
+
+ public Set<K> keySet()
+ {
+ return map.keySet();
+ }
+
+ public boolean isPutCopying()
+ {
+ return true;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,27 @@
+package org.apache.cassandra.cache;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+
+import com.sun.jna.Memory;
+
+public class SerializingCacheProvider implements IRowCacheProvider
+{
+ public SerializingCacheProvider() throws ConfigurationException
+ {
+ try
+ {
+ Memory.class.getName();
+ }
+ catch (NoClassDefFoundError e)
+ {
+ throw new ConfigurationException("Cannot intialize SerializationCache without JNA in the class path");
+ }
+ }
+
+ public ICache<DecoratedKey, ColumnFamily> create(int capacity)
+ {
+ return new SerializingCache<DecoratedKey, ColumnFamily>(capacity, ColumnFamily.serializer());
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Apr 6 18:15:47 2011
@@ -223,7 +223,14 @@ public final class CFMetaData
memtableThroughputInMb = DEFAULT_MEMTABLE_THROUGHPUT_IN_MB;
memtableOperationsInMillions = DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS;
mergeShardsChance = DEFAULT_MERGE_SHARDS_CHANCE;
- rowCacheProvider = FBUtilities.newCacheProvider(DEFAULT_ROW_CACHE_PROVIDER);
+ try
+ {
+ rowCacheProvider = FBUtilities.newCacheProvider(DEFAULT_ROW_CACHE_PROVIDER);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new AssertionError(e); // the default provider should not error out
+ }
// Defaults strange or simple enough to not need a DEFAULT_T for
defaultValidator = BytesType.instance;
@@ -381,7 +388,17 @@ public final class CFMetaData
if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); }
if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); }
if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); }
- if (cf.row_cache_provider != null) { newCFMD.rowCacheProvider(FBUtilities.newCacheProvider(cf.row_cache_provider.toString())); }
+ if (cf.row_cache_provider != null)
+ {
+ try
+ {
+ newCFMD.rowCacheProvider(FBUtilities.newCacheProvider(cf.row_cache_provider.toString()));
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
return newCFMD.comment(cf.comment.toString())
.rowCacheSize(cf.row_cache_size)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Apr 6 18:15:47 2011
@@ -31,8 +31,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.ICompactSerializer3;
-public class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+public class ColumnFamilySerializer implements ICompactSerializer3<ColumnFamily>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilySerializer.class);
@@ -142,4 +143,9 @@ public class ColumnFamilySerializer impl
cf.delete(input.readInt(), input.readLong());
return cf;
}
+
+ public long serializedSize(ColumnFamily cf)
+ {
+ return cf.serializedSize();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Apr 6 18:15:47 2011
@@ -729,11 +729,18 @@ public class ColumnFamilyStore implement
Memtable mt = getMemtableThreadSafe();
boolean flushRequested = mt.isThresholdViolated();
mt.put(key, columnFamily);
- ColumnFamily cachedRow = getRawCachedRow(key);
- if (cachedRow != null)
- cachedRow.addAll(columnFamily);
- writeStats.addNano(System.nanoTime() - start);
-
+ if (rowCache.isPutCopying())
+ {
+ invalidateCachedRow(key);
+ }
+ else
+ {
+ ColumnFamily cachedRow = getRawCachedRow(key);
+ if (cachedRow != null)
+ cachedRow.addAll(columnFamily);
+ writeStats.addNano(System.nanoTime() - start);
+ }
+
return flushRequested ? mt : null;
}
@@ -1087,12 +1094,15 @@ public class ColumnFamilyStore implement
if (cached == null)
return null;
- // make a deep copy of column data so we don't keep references to direct buffers, which
- // would prevent munmap post-compaction.
- for (IColumn column : cached.getSortedColumns())
+ if (!rowCache.isPutCopying())
{
- cached.remove(column.name());
- cached.addColumn(column.localCopy(this));
+ // make a deep copy of column data so we don't keep references to direct buffers, which
+ // would prevent munmap post-compaction.
+ for (IColumn column : cached.getSortedColumns())
+ {
+ cached.remove(column.name());
+ cached.addColumn(column.localCopy(this));
+ }
}
// avoid keeping a permanent reference to the original key buffer
Added: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer3.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.io;
+
+public interface ICompactSerializer3<T> extends ICompactSerializer2<T>
+{
+ public long serializedSize(T t);
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,45 @@
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.cache.FreeableMemory;
+
+import com.sun.xml.internal.ws.Closeable;
+
+public class MemoryInputStream extends AbstractDataInput implements DataInput, Closeable
+{
+ private final FreeableMemory mem;
+ private int position = 0;
+
+ public MemoryInputStream(FreeableMemory mem)
+ {
+ this.mem = mem;
+ }
+
+ public int read() throws IOException
+ {
+ return mem.getValidByte(position++) & 0xFF;
+ }
+
+ protected void seekInternal(int pos)
+ {
+ position = pos;
+ }
+
+ protected int getPosition()
+ {
+ return position;
+ }
+
+ public int skipBytes(int n) throws IOException
+ {
+ seekInternal(getPosition() + n);
+ return position;
+ }
+
+ public void close()
+ {
+ // do nothing.
+ }
+}
\ No newline at end of file
Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java?rev=1089555&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java Wed Apr 6 18:15:47 2011
@@ -0,0 +1,33 @@
+package org.apache.cassandra.io.util;
+
+import java.io.OutputStream;
+
+import com.sun.jna.Memory;
+
+/**
+ * This class provides a way to stream the writes into the {@link Memory}
+ */
+public class MemoryOutputStream extends OutputStream
+{
+
+ private final Memory mem;
+ private int position = 0;
+
+ public MemoryOutputStream(Memory mem)
+ {
+ this.mem = mem;
+ }
+
+ @Override
+ public void write(int b)
+ {
+ mem.setByte(this.position, (byte)b);
+ this.position++;
+ }
+
+ public int position()
+ {
+ return this.position;
+ }
+
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Apr 6 18:15:47 2011
@@ -655,18 +655,11 @@ public class FBUtilities
return field;
}
- public static IRowCacheProvider newCacheProvider(String cache_provider)
+ public static IRowCacheProvider newCacheProvider(String cache_provider) throws ConfigurationException
{
if (!cache_provider.contains("."))
cache_provider = "org.apache.cassandra.cache." + cache_provider;
- try
- {
- return FBUtilities.construct(cache_provider, "row cache provider");
- }
- catch (ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
+ return FBUtilities.construct(cache_provider, "row cache provider");
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1089555&r1=1089554&r2=1089555&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java Wed Apr 6 18:15:47 2011
@@ -85,4 +85,13 @@ public class CacheProviderTest extends S
simpleCase(cf, cache);
concurrentCase(cf, cache);
}
+
+ @Test
+ public void testSerializingCache() throws InterruptedException
+ {
+ ICache<String, ColumnFamily> cache = new SerializingCache<String, ColumnFamily>(CAPACITY, ColumnFamily.serializer());
+ ColumnFamily cf = createCF();
+ simpleCase(cf, cache);
+ // concurrentCase(cf, cache);
+ }
}