You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/28 19:03:12 UTC
[4/7] git commit: Fix race between writes and read for cache
Fix race between writes and read for cache
patch by jbellis and slebresne; reviewed by jbellis and slebresne for CASSANDRA-3862
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9270f4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9270f4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9270f4e
Branch: refs/heads/trunk
Commit: c9270f4e3ae5f94d46070f1c7e585c90bc68df7c
Parents: aa75168
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Feb 28 18:53:32 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Feb 28 18:53:32 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/cache/ConcurrentLinkedHashCache.java | 10 ++
.../cache/ConcurrentLinkedHashCacheProvider.java | 13 +-
src/java/org/apache/cassandra/cache/ICache.java | 4 +
.../org/apache/cassandra/cache/IRowCacheEntry.java | 5 +
.../apache/cassandra/cache/IRowCacheProvider.java | 2 +-
.../apache/cassandra/cache/InstrumentingCache.java | 10 ++
.../apache/cassandra/cache/RowCacheSentinel.java | 45 ++++++
.../apache/cassandra/cache/SerializingCache.java | 35 ++++-
.../cassandra/cache/SerializingCacheProvider.java | 47 ++++++-
src/java/org/apache/cassandra/db/ColumnFamily.java | 9 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 119 +++++++++++----
.../apache/cassandra/db/RowIteratorFactory.java | 2 +
.../db/compaction/CompactionIterable.java | 2 +-
.../db/compaction/ParallelCompactionIterable.java | 2 +-
.../org/apache/cassandra/service/CacheService.java | 8 +-
.../cassandra/streaming/IncomingStreamReader.java | 3 +-
.../org/apache/cassandra/utils/StatusLogger.java | 3 +-
.../unit/org/apache/cassandra/db/RowCacheTest.java | 4 +-
19 files changed, 264 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41316eb..b5b79a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
* ignore deprecated KsDef/CfDef/ColumnDef fields in native schema (CASSANDRA-3963)
* CLI to report when unsupported column_metadata pair was given (CASSANDRA-3959)
* reincarnate removed and deprecated KsDef/CfDef attributes (CASSANDRA-3953)
+ * Fix race between writes and read for cache (CASSANDRA-3862)
Merged from 1.0:
* remove the wait on hint future during write (CASSANDRA-3870)
* (cqlsh) ignore missing CfDef opts (CASSANDRA-3933)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
index 8f4d2f0..a1cf4ea 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
@@ -117,6 +117,16 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V>
map.put(key, value);
}
+ public boolean putIfAbsent(K key, V value)
+ {
+ return map.putIfAbsent(key, value) == null;
+ }
+
+ public boolean replace(K key, V old, V value)
+ {
+ return map.replace(key, old, value);
+ }
+
public void remove(K key)
{
map.remove(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
index 851d4c5..71babd6 100644
--- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.cache;
*
*/
-import org.apache.cassandra.db.ColumnFamily;
-
import com.googlecode.concurrentlinkedhashmap.Weigher;
import com.googlecode.concurrentlinkedhashmap.Weighers;
@@ -29,21 +27,20 @@ import org.github.jamm.MemoryMeter;
public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider
{
- public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher)
+ public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher)
{
return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher
? createMemoryWeigher()
- : Weighers.<ColumnFamily>singleton());
+ : Weighers.<IRowCacheEntry>singleton());
}
- private static Weigher<ColumnFamily> createMemoryWeigher()
+ private static Weigher<IRowCacheEntry> createMemoryWeigher()
{
- return new Weigher<ColumnFamily>()
+ return new Weigher<IRowCacheEntry>()
{
final MemoryMeter meter = new MemoryMeter();
- @Override
- public int weightOf(ColumnFamily value)
+ public int weightOf(IRowCacheEntry value)
{
return (int) Math.min(meter.measure(value), Integer.MAX_VALUE);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ICache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ICache.java b/src/java/org/apache/cassandra/cache/ICache.java
index 48e045c..5f8e00b 100644
--- a/src/java/org/apache/cassandra/cache/ICache.java
+++ b/src/java/org/apache/cassandra/cache/ICache.java
@@ -36,6 +36,10 @@ public interface ICache<K, V>
public void put(K key, V value);
+ public boolean putIfAbsent(K key, V value);
+
+ public boolean replace(K key, V old, V value);
+
public V get(K key);
public void remove(K key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheEntry.java b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
new file mode 100644
index 0000000..7340428
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java
@@ -0,0 +1,5 @@
+package org.apache.cassandra.cache;
+
+public interface IRowCacheEntry
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
index 9209ced..9e1eb7c 100644
--- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java
@@ -27,5 +27,5 @@ import org.apache.cassandra.db.ColumnFamily;
*/
public interface IRowCacheProvider
{
- public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher);
+ public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/InstrumentingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
index 36630ac..b4d048f 100644
--- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java
+++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
@@ -45,6 +45,16 @@ public class InstrumentingCache<K, V>
map.put(key, value);
}
+ public boolean putIfAbsent(K key, V value)
+ {
+ return map.putIfAbsent(key, value);
+ }
+
+ public boolean replace(K key, V old, V value)
+ {
+ return map.replace(key, old, value);
+ }
+
public V get(K key)
{
V v = map.get(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheSentinel.java b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
new file mode 100644
index 0000000..381160a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java
@@ -0,0 +1,45 @@
+package org.apache.cassandra.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+
+/**
+ * A sentinel object for row caches. See comments to getThroughCache and CASSANDRA-3862.
+ */
+public class RowCacheSentinel implements IRowCacheEntry
+{
+ private static final AtomicLong generator = new AtomicLong();
+
+ final long sentinelId;
+
+ public RowCacheSentinel()
+ {
+ sentinelId = generator.getAndIncrement();
+ }
+
+ RowCacheSentinel(long sentinelId)
+ {
+ this.sentinelId = sentinelId;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof RowCacheSentinel)) return false;
+
+ RowCacheSentinel other = (RowCacheSentinel) o;
+ return this.sentinelId == other.sentinelId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(sentinelId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index b8844cb..4946fb0 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -76,7 +76,6 @@ public class SerializingCache<K, V> implements ICache<K, V>
{
return new Weigher<FreeableMemory>()
{
- @Override
public int weightOf(FreeableMemory value)
{
return (int) Math.min(value.size(), Integer.MAX_VALUE);
@@ -182,6 +181,40 @@ public class SerializingCache<K, V> implements ICache<K, V>
old.unreference();
}
+ public boolean putIfAbsent(K key, V value)
+ {
+ FreeableMemory mem = serialize(value);
+ if (mem == null)
+ return false; // out of memory. never mind.
+
+ FreeableMemory old = map.putIfAbsent(key, mem);
+ if (old != null)
+ // the new value was not put, we've uselessly allocated some memory, free it
+ mem.unreference();
+ return old == null;
+ }
+
+ public boolean replace(K key, V oldToReplace, V value)
+ {
+ // if there is no old value in our map, we fail
+ FreeableMemory old = map.get(key);
+ if (old == null)
+ return false;
+
+ // see if the old value matches the one we want to replace
+ FreeableMemory mem = serialize(value);
+ if (mem == null)
+ return false; // out of memory. never mind.
+ V oldValue = deserialize(old);
+ boolean success = oldValue.equals(oldToReplace) && map.replace(key, old, mem);
+
+ if (success)
+ old.unreference();
+ else
+ mem.unreference();
+ return success;
+ }
+
public void remove(K key)
{
FreeableMemory mem = map.remove(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index f71684b..3a06d36 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -20,12 +20,55 @@ package org.apache.cassandra.cache;
*
*/
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.io.ISerializer;
public class SerializingCacheProvider implements IRowCacheProvider
{
- public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher)
+ public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher)
+ {
+ return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher, new RowCacheSerializer());
+ }
+
+ private static class RowCacheSerializer implements ISerializer<IRowCacheEntry>
{
- return new SerializingCache<RowCacheKey, ColumnFamily>(capacity, useMemoryWeigher, ColumnFamily.serializer());
+ public void serialize(IRowCacheEntry cf, DataOutput out)
+ {
+ assert cf != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
+ try
+ {
+ out.writeBoolean(cf instanceof RowCacheSentinel);
+ if (cf instanceof RowCacheSentinel)
+ out.writeLong(((RowCacheSentinel) cf).sentinelId);
+ else
+ ColumnFamily.serializer.serialize((ColumnFamily) cf, out);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public IRowCacheEntry deserialize(DataInput in) throws IOException
+ {
+ boolean isSentinel = in.readBoolean();
+ if (isSentinel)
+ return new RowCacheSentinel(in.readLong());
+ return ColumnFamily.serializer.deserialize(in);
+ }
+
+ public long serializedSize(IRowCacheEntry cf)
+ {
+ return DBConstants.boolSize
+ + (cf instanceof RowCacheSentinel
+ ? DBConstants.intSize + DBConstants.longSize
+ : ColumnFamily.serializer().serializedSize((ColumnFamily) cf));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 9191df6..740a0a6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.QueryPath;
@@ -36,10 +37,10 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HeapAllocator;
-public class ColumnFamily extends AbstractColumnContainer
+public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEntry
{
- /* The column serializer for this Column Family. Create based on config. */
- private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
+ public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
+
private final CFMetaData cfm;
public static ColumnFamilySerializer serializer()
@@ -77,7 +78,7 @@ public class ColumnFamily extends AbstractColumnContainer
return new ColumnFamily(cfm, factory.create(cfm.comparator, reversedInsertOrder));
}
- private ColumnFamily(CFMetaData cfm, ISortedColumns map)
+ protected ColumnFamily(CFMetaData cfm, ISortedColumns map)
{
super(map);
assert cfm != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 165b150..a8d1fc8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
@@ -28,14 +30,17 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import javax.management.*;
-import com.google.common.collect.*;
-
-import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -54,9 +59,11 @@ import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.*;
@@ -386,13 +393,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
long start = System.currentTimeMillis();
- AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache;
+ AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache;
// results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup
int cachedRowsRead = 0;
for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily))
{
- cacheRow(metadata.cfId, key);
+ ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(columnFamily)),
+ Integer.MIN_VALUE,
+ true);
+ CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, key), data);
}
if (cachedRowsRead > 0)
@@ -708,15 +718,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
+ // always invalidate a copying cache value
if (CacheService.instance.rowCache.isPutCopying())
{
invalidateCachedRow(cacheKey);
+ return;
}
- else
+
+ // invalidate a normal cache value if it's a sentinel, so the read will retry (and include the new update)
+ IRowCacheEntry cachedRow = getCachedRowInternal(cacheKey);
+ if (cachedRow != null)
{
- ColumnFamily cachedRow = getRawCachedRow(cacheKey);
- if (cachedRow != null)
- cachedRow.addAll(columnFamily, HeapAllocator.instance);
+ if (cachedRow instanceof RowCacheSentinel)
+ invalidateCachedRow(cacheKey);
+ else
+ ((ColumnFamily) cachedRow).addAll(columnFamily, HeapAllocator.instance);
}
}
@@ -1088,30 +1104,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
}
- public ColumnFamily cacheRow(Integer cfId, DecoratedKey decoratedKey)
+ /**
+ * fetch the row given by filter.key if it is in the cache; if not, read it from disk and cache it
+ * @param cfId the column family to read the row from
+ * @param filter the columns being queried. Note that we still cache entire rows, but if a row is uncached
+ * and we race to cache it, only the winner will read the entire row
+ * @return the entire row for filter.key, if present in the cache (or we can cache it), or just the column
+ * specified by filter otherwise
+ */
+ private ColumnFamily getThroughCache(Integer cfId, QueryFilter filter)
{
assert isRowCacheEnabled()
- : String.format("Row cache is not enabled on column family [" + getColumnFamilyName() + "]");
+ : String.format("Row cache is not enabled on column family [" + getColumnFamilyName() + "]");
- RowCacheKey key = new RowCacheKey(cfId, decoratedKey);
+ RowCacheKey key = new RowCacheKey(cfId, filter.key);
- ColumnFamily cached;
-
- if ((cached = CacheService.instance.rowCache.get(key)) == null)
+ // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
+ // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+ IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+ if (cached != null)
{
- // We force ThreadSafeSortedColumns because cached row will be accessed concurrently
- cached = getTopLevelColumns(QueryFilter.getIdentityFilter(decoratedKey, new QueryPath(columnFamily)),
- Integer.MIN_VALUE,
- true);
+ if (cached instanceof RowCacheSentinel)
+ {
+ // Some other read is trying to cache the value, just do a normal non-caching read
+ return getTopLevelColumns(filter, Integer.MIN_VALUE, false);
+ }
+ return (ColumnFamily) cached;
+ }
- if (cached == null)
- return null;
+ RowCacheSentinel sentinel = new RowCacheSentinel();
+ boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
- // avoid keeping a permanent reference to the original key buffer
- CacheService.instance.rowCache.put(key, cached);
- }
+ try
+ {
+ ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, new QueryPath(columnFamily)),
+ Integer.MIN_VALUE,
+ true);
+ if (sentinelSuccess && data != null)
+ CacheService.instance.rowCache.replace(key, sentinel, data);
- return cached;
+ return data;
+ }
+ finally
+ {
+ if (sentinelSuccess && data == null)
+ CacheService.instance.rowCache.remove(key);
+ }
}
ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
@@ -1137,7 +1175,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (cfId == null)
return null; // secondary index
- ColumnFamily cached = cacheRow(cfId, filter.key);
+ ColumnFamily cached = getThroughCache(cfId, filter);
if (cached == null)
return null;
@@ -1484,21 +1522,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data.getSSTables().size();
}
- /** raw cached row -- does not fetch the row if it is not present. not counted in cache statistics. */
-
+ /**
+ * @return the cached row for @param key if it is already present in the cache.
+ * That is, unlike getThroughCache, it will not readAndCache the row if it is not present, nor
+ * are these calls counted in cache statistics.
+ *
+ * Note that this WILL cause deserialization of a SerializingCache row, so if all you
+ * need to know is whether a row is present or not, use containsCachedRow instead.
+ */
public ColumnFamily getRawCachedRow(DecoratedKey key)
{
if (metadata.cfId == null)
return null; // secondary index
- return getRawCachedRow(new RowCacheKey(metadata.cfId, key));
+ IRowCacheEntry cached = getCachedRowInternal(new RowCacheKey(metadata.cfId, key));
+ return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached;
}
- public ColumnFamily getRawCachedRow(RowCacheKey key)
+ private IRowCacheEntry getCachedRowInternal(RowCacheKey key)
{
return CacheService.instance.rowCache.getCapacity() == 0 ? null : CacheService.instance.rowCache.getInternal(key);
}
+ /**
+ * @return true if @param key is contained in the row cache
+ */
+ public boolean containsCachedRow(DecoratedKey key)
+ {
+ return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
+ }
+
public void invalidateCachedRow(RowCacheKey key)
{
CacheService.instance.rowCache.remove(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 34fe07a..a31f6ec 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -103,8 +103,10 @@ public class RowIteratorFactory
// First check if this row is in the rowCache. If it is we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
if (cached == null)
+ {
// not cached: collate
filter.collateColumns(returnCF, colIters, gcBefore);
+ }
else
{
QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 270c3af..2fd0240 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -110,7 +110,7 @@ public class CompactionIterable extends AbstractCompactionIterable
// If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
// like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
// memory on long running instances
- controller.removeDeletedInCache(compactedRow.key);
+ controller.invalidateCachedRow(compactedRow.key);
}
return compactedRow;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index dba8f55..9b67676 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -130,7 +130,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
// If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
// like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
// memory on long running instances
- controller.removeDeletedInCache(compactedRow.key);
+ controller.invalidateCachedRow(compactedRow.key);
return compactedRow;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 8dd6bde..c70c45d 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -62,7 +62,7 @@ public class CacheService implements CacheServiceMBean
public final static CacheService instance = new CacheService();
public final AutoSavingCache<KeyCacheKey, Long> keyCache;
- public final AutoSavingCache<RowCacheKey, ColumnFamily> rowCache;
+ public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache;
private int rowCacheSavePeriod;
private int keyCacheSavePeriod;
@@ -116,7 +116,7 @@ public class CacheService implements CacheServiceMBean
/**
* @return initialized row cache
*/
- private AutoSavingCache<RowCacheKey, ColumnFamily> initRowCache()
+ private AutoSavingCache<RowCacheKey, IRowCacheEntry> initRowCache()
{
logger.info("Initializing row cache with capacity of {} MBs and provider {}",
DatabaseDescriptor.getRowCacheSizeInMB(),
@@ -125,8 +125,8 @@ public class CacheService implements CacheServiceMBean
int rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024;
// cache object
- ICache<RowCacheKey, ColumnFamily> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true);
- AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = new AutoSavingCache<RowCacheKey, ColumnFamily>(rc, CacheType.ROW_CACHE);
+ ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true);
+ AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE);
int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index f57b400..915d3bc 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -124,8 +124,7 @@ public class IncomingStreamReader
key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
long dataSize = SSTableReader.readRowSize(in, localFile.desc);
- ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
+ if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
{
// need to update row cache
// Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index 9d1ff68..1185315 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.cache.RowCacheKey;
import org.apache.cassandra.db.ColumnFamily;
@@ -90,7 +91,7 @@ public class StatusLogger
// Global key/row cache information
AutoSavingCache<KeyCacheKey, Long> keyCache = CacheService.instance.keyCache;
- AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache;
+ AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache;
int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 9bce4e8..da5cf63 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -66,7 +66,7 @@ public class RowCacheTest extends CleanupHelper
cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
assert CacheService.instance.rowCache.size() == i + 1;
- assert cachedStore.getRawCachedRow(key) != null; // current key should be stored in the cache
+ assert cachedStore.containsCachedRow(key); // current key should be stored in the cache
// checking if column is read correctly after cache
ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
@@ -88,7 +88,7 @@ public class RowCacheTest extends CleanupHelper
QueryPath path = new QueryPath(COLUMN_FAMILY, null, ByteBufferUtil.bytes("col" + i));
cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
- assert cachedStore.getRawCachedRow(key) != null; // cache should be populated with the latest rows read (old ones should be popped)
+ assert cachedStore.containsCachedRow(key); // cache should be populated with the latest rows read (old ones should be popped)
// checking if column is read correctly after cache
ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);