You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/27 16:10:33 UTC
svn commit: r916958 - in /incubator/cassandra/branches/cassandra-0.6:
src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/io/
Author: jbellis
Date: Sat Feb 27 15:10:33 2010
New Revision: 916958
URL: http://svn.apache.org/viewvc?rev=916958&view=rev
Log:
Unify key cache per CF under SSTableTracker, hooked in w/ setTrackedBy replacing addFinalizingReference.
patch by Stu Hood and jbellis for CASSANDRA-801
Removed:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java
incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableAccessor.java
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb 27 15:10:33 2010
@@ -18,51 +18,47 @@
package org.apache.cassandra.db;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.io.Closeable;
import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
-import com.google.common.collect.AbstractIterator;
+import org.apache.log4j.Logger;
+import org.apache.commons.collections.IteratorUtils;
+
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.cache.IAggregatableCacheProvider;
+import com.google.common.collect.Iterators;
import org.apache.cassandra.cache.InstrumentedCache;
-import org.apache.cassandra.cache.JMXAggregatingCache;
import org.apache.cassandra.cache.JMXInstrumentedCache;
-import org.apache.log4j.Logger;
-
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableScanner;
+import org.apache.cassandra.io.SSTableTracker;
import org.apache.cassandra.io.util.FileUtils;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.*;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-import org.apache.commons.collections.IteratorUtils;
-
-import com.google.common.collect.Iterators;
-import com.google.common.base.Predicate;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -190,46 +186,13 @@
}
sstables.add(sstable);
}
- ssTables_ = new SSTableTracker();
+ ssTables_ = new SSTableTracker(table, columnFamilyName);
ssTables_.add(sstables);
- int cacheSize = DatabaseDescriptor.getRowsCachedFor(table, columnFamilyName, ssTables_.estimatedKeys());
+ int rowCacheSize = DatabaseDescriptor.getRowsCachedFor(table, columnFamilyName, ssTables_.estimatedKeys());
if (logger_.isDebugEnabled())
- logger_.debug("row cache capacity for " + columnFamilyName + " is " + cacheSize);
- rowCache = new JMXInstrumentedCache<String, ColumnFamily>(table, columnFamilyName + "RowCache", cacheSize);
-
- // we don't need to keep a reference to the key cache aggregator, just create it so it registers itself w/ JMX
- new JMXAggregatingCache(new Iterable<IAggregatableCacheProvider>()
- {
- public Iterator<IAggregatableCacheProvider> iterator()
- {
- final Iterator<SSTableReader> iter = ssTables_.iterator();
- return new AbstractIterator<IAggregatableCacheProvider>()
- {
- @Override
- protected IAggregatableCacheProvider computeNext()
- {
- if (!iter.hasNext())
- return endOfData();
-
- return new IAggregatableCacheProvider()
- {
- SSTableReader sstable = iter.next();
-
- public InstrumentedCache getCache()
- {
- return sstable.getKeyCache();
- }
-
- public long getObjectCount()
- {
- return sstable.getIndexPositions().size() * SSTableReader.indexInterval();
- }
- };
- }
- };
- }
- }, table, columnFamilyName + "KeyCache");
+ logger_.debug("row cache capacity for " + columnFamilyName + " is " + rowCacheSize);
+ rowCache = new JMXInstrumentedCache<String, ColumnFamily>(table, columnFamilyName + "RowCache", rowCacheSize);
}
public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily) throws IOException
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Sat Feb 27 15:10:33 2010
@@ -32,6 +32,7 @@
import org.apache.cassandra.cache.InstrumentedCache;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
@@ -108,8 +109,7 @@
public static SSTableReader open(String dataFileName) throws IOException
{
- return open(dataFileName,
- StorageService.getPartitioner());
+ return open(dataFileName, StorageService.getPartitioner());
}
public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
@@ -122,8 +122,6 @@
sstable.loadIndexFile();
sstable.loadBloomFilter();
- long expectedKeys = (sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL;
- sstable.keyCache = createKeyCache(parseTableName(dataFileName), parseColumnFamilyName(dataFileName), expectedKeys);
if (logger.isDebugEnabled())
logger.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
@@ -135,24 +133,15 @@
private final MappedByteBuffer[] indexBuffers;
private final MappedByteBuffer[] buffers;
-
- public static InstrumentedCache<DecoratedKey, PositionSize> createKeyCache(String ksname, String cfname, long expectedKeys)
- {
- int keysToCache = DatabaseDescriptor.getKeysCachedFor(ksname, cfname, expectedKeys);
- return new InstrumentedCache<DecoratedKey, PositionSize>(keysToCache);
- }
-
- private InstrumentedCache<DecoratedKey, PositionSize> keyCache;
+ private InstrumentedCache<Pair<String, DecoratedKey>, PositionSize> keyCache;
SSTableReader(String filename,
IPartitioner partitioner,
List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
- BloomFilter bloomFilter,
- InstrumentedCache<DecoratedKey, PositionSize> keyCache)
- throws IOException
+ BloomFilter bloomFilter)
+ throws IOException
{
super(filename, partitioner);
- assert keyCache != null;
if (DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap)
{
@@ -192,13 +181,13 @@
this.indexPositions = indexPositions;
this.spannedIndexDataPositions = spannedIndexDataPositions;
this.bf = bloomFilter;
- this.keyCache = keyCache;
}
- public void addFinalizingReference(SSTableTracker tracker)
+ public void setTrackedBy(SSTableTracker tracker)
{
phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
finalizers.add(phantomReference);
+ keyCache = tracker.getKeyCache();
}
private static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
@@ -225,7 +214,7 @@
private SSTableReader(String filename, IPartitioner partitioner) throws IOException
{
- this(filename, partitioner, null, null, null, new InstrumentedCache<DecoratedKey, PositionSize>(0));
+ this(filename, partitioner, null, null, null);
}
public List<KeyPosition> getIndexPositions()
@@ -233,6 +222,11 @@
return indexPositions;
}
+ public long estimatedKeys()
+ {
+ return indexPositions.size() * INDEX_INTERVAL;
+ }
+
void loadBloomFilter() throws IOException
{
DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
@@ -320,21 +314,29 @@
*/
public PositionSize getPosition(DecoratedKey decoratedKey) throws IOException
{
+ // first, check bloom filter
if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
return null;
- if (keyCache.getCapacity() > 0)
+
+ // next, the key cache
+ Pair<String, DecoratedKey> unifiedKey = new Pair<String, DecoratedKey>(path, decoratedKey);
+ if (keyCache != null && keyCache.getCapacity() > 0)
{
- PositionSize cachedPosition = keyCache.get(decoratedKey);
+ PositionSize cachedPosition = keyCache.get(unifiedKey);
if (cachedPosition != null)
{
return cachedPosition;
}
}
+
+ // next, see if the sampled index says it's impossible for the key to be present
KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
if (sampledPosition == null)
{
return null;
}
+
+ // handle exact sampled index hit
if (spannedIndexDataPositions != null)
{
PositionSize info = spannedIndexDataPositions.get(sampledPosition);
@@ -342,6 +344,7 @@
return info;
}
+ // scan the on-disk index, starting at the nearest sampled position
long p = sampledPosition.position;
FileDataInput input;
if (indexBuffers == null)
@@ -383,8 +386,8 @@
{
info = new PositionSize(position, length() - position);
}
- if (keyCache.getCapacity() > 0)
- keyCache.put(decoratedKey, info);
+ if (keyCache != null && keyCache.getCapacity() > 0)
+ keyCache.put(unifiedKey, info);
return info;
}
if (v > 0)
@@ -502,11 +505,6 @@
return ColumnFamily.create(getTableName(), getColumnFamilyName());
}
- public InstrumentedCache<DecoratedKey, PositionSize> getKeyCache()
- {
- return keyCache;
- }
-
public ICompactSerializer2<IColumn> getColumnSerializer()
{
return DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()).equals("Standard")
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java Sat Feb 27 15:10:33 2010
@@ -25,16 +25,32 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.cache.JMXInstrumentedCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.Pair;
+
+import org.apache.log4j.Logger;
public class SSTableTracker implements Iterable<SSTableReader>
{
+ private static final Logger logger = Logger.getLogger(SSTableTracker.class);
+
private volatile Set<SSTableReader> sstables;
private final AtomicLong liveSize = new AtomicLong();
private final AtomicLong totalSize = new AtomicLong();
- public SSTableTracker()
+ private final String ksname;
+ private final String cfname;
+
+ private final JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize> keyCache;
+
+ public SSTableTracker(String ksname, String cfname)
{
- this.sstables = Collections.<SSTableReader>emptySet();
+ this.ksname = ksname;
+ this.cfname = cfname;
+ sstables = Collections.emptySet();
+ keyCache = new JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize>(ksname, cfname + "KeyCache", 0);
}
public synchronized void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) throws IOException
@@ -48,7 +64,7 @@
long size = sstable.bytesOnDisk();
liveSize.addAndGet(size);
totalSize.addAndGet(size);
- sstable.addFinalizingReference(this);
+ sstable.setTrackedBy(this);
}
for (SSTableReader sstable : oldSSTables)
@@ -60,6 +76,15 @@
}
sstables = Collections.unmodifiableSet(sstablesNew);
+
+ int keyCacheSize = DatabaseDescriptor.getKeysCachedFor(ksname, cfname, estimatedKeys());
+ if (keyCacheSize != keyCache.getCapacity())
+ {
+ // update cache size for the new key volume
+ if (logger.isDebugEnabled())
+ logger.debug("key cache capacity for " + cfname + " is " + keyCacheSize);
+ keyCache.setCapacity(keyCacheSize);
+ }
}
public synchronized void add(Iterable<SSTableReader> sstables)
@@ -107,7 +132,7 @@
long n = 0;
for (SSTableReader sstable : this)
{
- n += sstable.getIndexPositions().size() * SSTableReader.INDEX_INTERVAL;
+ n += sstable.estimatedKeys();
}
return n;
}
@@ -126,5 +151,10 @@
{
totalSize.addAndGet(-size);
}
+
+ public JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize> getKeyCache()
+ {
+ return keyCache;
+ }
}
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java Sat Feb 27 15:10:33 2010
@@ -21,21 +21,23 @@
*/
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOError;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.log4j.Logger;
-import org.apache.cassandra.cache.InstrumentedCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
public class SSTableWriter extends SSTable
{
@@ -151,8 +153,7 @@
rename(filterFilename());
path = rename(path); // important to do this last since index & filter file names are derived from it
- InstrumentedCache<DecoratedKey, PositionSize> keyCache = SSTableReader.createKeyCache(getTableName(), getColumnFamilyName(), keysWritten);
- return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf, keyCache);
+ return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf);
}
static String rename(String tmpFilename)
Modified: incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java Sat Feb 27 15:10:33 2010
@@ -41,7 +41,7 @@
TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
map.put(key, bytes);
- SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", "singlewrite", map);
+ SSTableReader ssTable = SSTableUtils.writeRawSSTable("Keyspace1", "Standard1", map);
// verify
verifySingle(ssTable, bytes, key);
@@ -69,7 +69,7 @@
}
// write
- SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", "manywrites", map);
+ SSTableReader ssTable = SSTableUtils.writeRawSSTable("Keyspace1", "Standard2", map);
// verify
verifyMany(ssTable, map);