You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/27 16:10:33 UTC

svn commit: r916958 - in /incubator/cassandra/branches/cassandra-0.6: src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/io/

Author: jbellis
Date: Sat Feb 27 15:10:33 2010
New Revision: 916958

URL: http://svn.apache.org/viewvc?rev=916958&view=rev
Log:
Unify key cache per CF under SSTableTracker, hooked in w/ setTrackedBy replacing addFinalizingReference.
patch by Stu Hood and jbellis for CASSANDRA-801

Removed:
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java
    incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableAccessor.java
Modified:
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
    incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb 27 15:10:33 2010
@@ -18,51 +18,47 @@
 
 package org.apache.cassandra.db;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.io.Closeable;
 import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
-import com.google.common.collect.AbstractIterator;
+import org.apache.log4j.Logger;
+import org.apache.commons.collections.IteratorUtils;
+
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.cache.IAggregatableCacheProvider;
+import com.google.common.collect.Iterators;
 import org.apache.cassandra.cache.InstrumentedCache;
-import org.apache.cassandra.cache.JMXAggregatingCache;
 import org.apache.cassandra.cache.JMXInstrumentedCache;
-import org.apache.log4j.Logger;
-
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableScanner;
+import org.apache.cassandra.io.SSTableTracker;
 import org.apache.cassandra.io.util.FileUtils;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.utils.*;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-import org.apache.commons.collections.IteratorUtils;
-
-import com.google.common.collect.Iterators;
-import com.google.common.base.Predicate;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -190,46 +186,13 @@
             }
             sstables.add(sstable);
         }
-        ssTables_ = new SSTableTracker();
+        ssTables_ = new SSTableTracker(table, columnFamilyName);
         ssTables_.add(sstables);
 
-        int cacheSize = DatabaseDescriptor.getRowsCachedFor(table, columnFamilyName, ssTables_.estimatedKeys());
+        int rowCacheSize = DatabaseDescriptor.getRowsCachedFor(table, columnFamilyName, ssTables_.estimatedKeys());
         if (logger_.isDebugEnabled())
-            logger_.debug("row cache capacity for " + columnFamilyName + " is " + cacheSize);
-        rowCache = new JMXInstrumentedCache<String, ColumnFamily>(table, columnFamilyName + "RowCache", cacheSize);
-
-        // we don't need to keep a reference to the key cache aggregator, just create it so it registers itself w/ JMX
-        new JMXAggregatingCache(new Iterable<IAggregatableCacheProvider>()
-        {
-            public Iterator<IAggregatableCacheProvider> iterator()
-            {
-                final Iterator<SSTableReader> iter = ssTables_.iterator();
-                return new AbstractIterator<IAggregatableCacheProvider>()
-                {
-                    @Override
-                    protected IAggregatableCacheProvider computeNext()
-                    {
-                        if (!iter.hasNext())
-                            return endOfData();
-
-                        return new IAggregatableCacheProvider()
-                        {
-                            SSTableReader sstable = iter.next();
-
-                            public InstrumentedCache getCache()
-                            {
-                                return sstable.getKeyCache();
-                            }
-
-                            public long getObjectCount()
-                            {
-                                return sstable.getIndexPositions().size() * SSTableReader.indexInterval();
-                            }
-                        };
-                    }
-                };
-            }
-        }, table, columnFamilyName + "KeyCache");
+            logger_.debug("row cache capacity for " + columnFamilyName + " is " + rowCacheSize);
+        rowCache = new JMXInstrumentedCache<String, ColumnFamily>(table, columnFamilyName + "RowCache", rowCacheSize);
     }
 
     public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily) throws IOException

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Sat Feb 27 15:10:33 2010
@@ -32,6 +32,7 @@
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -108,8 +109,7 @@
 
     public static SSTableReader open(String dataFileName) throws IOException
     {
-        return open(dataFileName,
-                    StorageService.getPartitioner());
+        return open(dataFileName, StorageService.getPartitioner());
     }
 
     public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
@@ -122,8 +122,6 @@
         sstable.loadIndexFile();
         sstable.loadBloomFilter();
 
-        long expectedKeys = (sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL;
-        sstable.keyCache = createKeyCache(parseTableName(dataFileName), parseColumnFamilyName(dataFileName), expectedKeys);
         if (logger.isDebugEnabled())
             logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
 
@@ -135,24 +133,15 @@
     private final MappedByteBuffer[] indexBuffers;
     private final MappedByteBuffer[] buffers;
 
-
-    public static InstrumentedCache<DecoratedKey, PositionSize> createKeyCache(String ksname, String cfname, long expectedKeys)
-    {
-        int keysToCache = DatabaseDescriptor.getKeysCachedFor(ksname, cfname, expectedKeys);
-        return new InstrumentedCache<DecoratedKey, PositionSize>(keysToCache);
-    }
-
-    private InstrumentedCache<DecoratedKey, PositionSize> keyCache;
+    private InstrumentedCache<Pair<String, DecoratedKey>, PositionSize> keyCache;
 
     SSTableReader(String filename,
                   IPartitioner partitioner,
                   List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
-                  BloomFilter bloomFilter,
-                  InstrumentedCache<DecoratedKey, PositionSize> keyCache)
-            throws IOException
+                  BloomFilter bloomFilter)
+    throws IOException
     {
         super(filename, partitioner);
-        assert keyCache != null;
 
         if (DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap)
         {
@@ -192,13 +181,13 @@
         this.indexPositions = indexPositions;
         this.spannedIndexDataPositions = spannedIndexDataPositions;
         this.bf = bloomFilter;
-        this.keyCache = keyCache;
     }
 
-    public void addFinalizingReference(SSTableTracker tracker)
+    public void setTrackedBy(SSTableTracker tracker)
     {
         phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
         finalizers.add(phantomReference);
+        keyCache = tracker.getKeyCache();
     }
 
     private static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
@@ -225,7 +214,7 @@
 
     private SSTableReader(String filename, IPartitioner partitioner) throws IOException
     {
-        this(filename, partitioner, null, null, null, new InstrumentedCache<DecoratedKey, PositionSize>(0));
+        this(filename, partitioner, null, null, null);
     }
 
     public List<KeyPosition> getIndexPositions()
@@ -233,6 +222,11 @@
         return indexPositions;
     }
 
+    public long estimatedKeys()
+    {
+        return indexPositions.size() * INDEX_INTERVAL;
+    }
+
     void loadBloomFilter() throws IOException
     {
         DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
@@ -320,21 +314,29 @@
      */
     public PositionSize getPosition(DecoratedKey decoratedKey) throws IOException
     {
+        // first, check bloom filter
         if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
             return null;
-        if (keyCache.getCapacity() > 0)
+
+        // next, the key cache
+        Pair<String, DecoratedKey> unifiedKey = new Pair<String, DecoratedKey>(path, decoratedKey);
+        if (keyCache != null && keyCache.getCapacity() > 0)
         {
-            PositionSize cachedPosition = keyCache.get(decoratedKey);
+            PositionSize cachedPosition = keyCache.get(unifiedKey);
             if (cachedPosition != null)
             {
                 return cachedPosition;
             }
         }
+
+        // next, see if the sampled index says it's impossible for the key to be present
         KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
         {
             return null;
         }
+
+        // handle exact sampled index hit
         if (spannedIndexDataPositions != null)
         {
             PositionSize info = spannedIndexDataPositions.get(sampledPosition);
@@ -342,6 +344,7 @@
                 return info;
         }
 
+        // scan the on-disk index, starting at the nearest sampled position
         long p = sampledPosition.position;
         FileDataInput input;
         if (indexBuffers == null)
@@ -383,8 +386,8 @@
                     {
                         info = new PositionSize(position, length() - position);
                     }
-                    if (keyCache.getCapacity() > 0)
-                        keyCache.put(decoratedKey, info);
+                    if (keyCache != null && keyCache.getCapacity() > 0)
+                        keyCache.put(unifiedKey, info);
                     return info;
                 }
                 if (v > 0)
@@ -502,11 +505,6 @@
         return ColumnFamily.create(getTableName(), getColumnFamilyName());
     }
 
-    public InstrumentedCache<DecoratedKey, PositionSize> getKeyCache()
-    {
-        return keyCache;
-    }
-
     public ICompactSerializer2<IColumn> getColumnSerializer()
     {
         return DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()).equals("Standard")

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java Sat Feb 27 15:10:33 2010
@@ -25,16 +25,32 @@
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.cassandra.cache.JMXInstrumentedCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.Pair;
+
+import org.apache.log4j.Logger;
 
 public class SSTableTracker implements Iterable<SSTableReader>
 {
+    private static final Logger logger = Logger.getLogger(SSTableTracker.class);
+
     private volatile Set<SSTableReader> sstables;
     private final AtomicLong liveSize = new AtomicLong();
     private final AtomicLong totalSize = new AtomicLong();
 
-    public SSTableTracker()
+    private final String ksname;
+    private final String cfname;
+
+    private final JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize> keyCache;
+
+    public SSTableTracker(String ksname, String cfname)
     {
-        this.sstables = Collections.<SSTableReader>emptySet();
+        this.ksname = ksname;
+        this.cfname = cfname;
+        sstables = Collections.emptySet();
+        keyCache = new JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize>(ksname, cfname + "KeyCache", 0);
     }
 
     public synchronized void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) throws IOException
@@ -48,7 +64,7 @@
             long size = sstable.bytesOnDisk();
             liveSize.addAndGet(size);
             totalSize.addAndGet(size);
-            sstable.addFinalizingReference(this);
+            sstable.setTrackedBy(this);
         }
 
         for (SSTableReader sstable : oldSSTables)
@@ -60,6 +76,15 @@
         }
 
         sstables = Collections.unmodifiableSet(sstablesNew);
+
+        int keyCacheSize = DatabaseDescriptor.getKeysCachedFor(ksname, cfname, estimatedKeys());
+        if (keyCacheSize != keyCache.getCapacity())
+        {   
+            // update cache size for the new key volume
+            if (logger.isDebugEnabled())
+                logger.debug("key cache capacity for " + cfname + " is " + keyCacheSize);
+            keyCache.setCapacity(keyCacheSize);
+        }
     }
 
     public synchronized void add(Iterable<SSTableReader> sstables)
@@ -107,7 +132,7 @@
         long n = 0;
         for (SSTableReader sstable : this)
         {
-            n += sstable.getIndexPositions().size() * SSTableReader.INDEX_INTERVAL;
+            n += sstable.estimatedKeys();
         }
         return n;
     }
@@ -126,5 +151,10 @@
     {
         totalSize.addAndGet(-size);
     }
+
+    public JMXInstrumentedCache<Pair<String, DecoratedKey>, SSTable.PositionSize> getKeyCache()
+    {
+        return keyCache;
+    }
 }
 

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java Sat Feb 27 15:10:33 2010
@@ -21,21 +21,23 @@
  */
 
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.cache.InstrumentedCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 
 public class SSTableWriter extends SSTable
 {
@@ -151,8 +153,7 @@
         rename(filterFilename());
         path = rename(path); // important to do this last since index & filter file names are derived from it
 
-        InstrumentedCache<DecoratedKey, PositionSize> keyCache = SSTableReader.createKeyCache(getTableName(), getColumnFamilyName(), keysWritten);
-        return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf, keyCache);
+        return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf);
     }
 
     static String rename(String tmpFilename)

Modified: incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=916958&r1=916957&r2=916958&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableTest.java Sat Feb 27 15:10:33 2010
@@ -41,7 +41,7 @@
 
         TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
         map.put(key, bytes);
-        SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", "singlewrite", map);
+        SSTableReader ssTable = SSTableUtils.writeRawSSTable("Keyspace1", "Standard1", map);
 
         // verify
         verifySingle(ssTable, bytes, key);
@@ -69,7 +69,7 @@
         }
 
         // write
-        SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", "manywrites", map);
+        SSTableReader ssTable = SSTableUtils.writeRawSSTable("Keyspace1", "Standard2", map);
 
         // verify
         verifyMany(ssTable, map);