You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/12 05:03:19 UTC

svn commit: r898175 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/Memtable.java db/Table.java db/filter/NamesQueryFilter.java db/filter/QueryFilter.java db/filter/SliceQueryFilter.java io/SSTableReader.java

Author: jbellis
Date: Tue Jan 12 04:03:19 2010
New Revision: 898175

URL: http://svn.apache.org/viewvc?rev=898175&view=rev
Log:
basic row caching.
patch by jbellis; reviewed by goffinet for CASSANDRA-678

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jan 12 04:03:19 2010
@@ -30,6 +30,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
 import org.apache.cassandra.service.SliceRange;
 import org.apache.log4j.Logger;
 
@@ -105,6 +106,8 @@
     /* active memtable associated with this ColumnFamilyStore. */
     private Memtable memtable_;
 
+    private ConcurrentLinkedHashMap<String, ColumnFamily> rowCache;
+
     // TODO binarymemtable ops are not threadsafe (do they need to be?)
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
@@ -124,6 +127,8 @@
         fileIndexGenerator_.set(indexValue);
         memtable_ = new Memtable(table_, columnFamily_);
         binaryMemtable_ = new AtomicReference<BinaryMemtable>(new BinaryMemtable(table_, columnFamily_));
+        int cacheSize = SSTableReader.estimatedKeys(columnFamilyName);
+        rowCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, cacheSize);
     }
 
     public static ColumnFamilyStore getColumnFamilyStore(String table, String columnFamily) throws IOException
@@ -780,6 +785,19 @@
         return getColumnFamily(filter, CompactionManager.getDefaultGCBefore());
     }
 
+    private ColumnFamily getCachedRow(QueryFilter filter) throws IOException
+    {
+        ColumnFamily cached;
+        if ((cached = rowCache.get(filter.key)) == null)
+        {
+            cached = getTopLevelColumns(new IdentityQueryFilter(filter.key, new QueryPath(columnFamily_)), Integer.MIN_VALUE);
+            if (cached == null)
+                return null;
+            rowCache.put(filter.key, cached);
+        }
+        return cached;
+    }
+
     /**
      * get a list of columns starting from a given column, in a specified order.
      * only the latest version of a column is returned.
@@ -792,19 +810,37 @@
         long start = System.currentTimeMillis();
         try
         {
-            // if we are querying subcolumns of a supercolumn, fetch the supercolumn with NQF, then filter in-memory.
             if (filter.path.superColumnName == null)
             {
-                return removeDeleted(getTopLevelColumns(filter, gcBefore), gcBefore);
+                if (rowCache == null)
+                    return removeDeleted(getTopLevelColumns(filter, gcBefore), gcBefore);
+
+                ColumnFamily cached = getCachedRow(filter);
+                ColumnIterator ci = filter.getMemColumnIterator(memtable_, cached, getComparator()); // TODO passing memtable here is confusing since it's almost entirely unused
+                ColumnFamily returnCF = ci.getColumnFamily();
+                filter.collectCollatedColumns(returnCF, ci, gcBefore);
+                return removeDeleted(returnCF);
             }
 
-            QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(columnFamily_), filter.path.superColumnName);
-            ColumnFamily cf = getTopLevelColumns(nameFilter, gcBefore);
-            if (cf == null || cf.getColumnCount() == 0)
-                return cf;
+            // we are querying subcolumns of a supercolumn: fetch the supercolumn with NQF, then filter in-memory.
+            ColumnFamily cf;
+            SuperColumn sc;
+            if (rowCache == null)
+            {
+                QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(columnFamily_), filter.path.superColumnName);
+                cf = getTopLevelColumns(nameFilter, gcBefore);
+                if (cf == null || cf.getColumnCount() == 0)
+                    return cf;
 
-            assert cf.getSortedColumns().size() == 1;
-            SuperColumn sc = (SuperColumn)cf.getSortedColumns().iterator().next();
+                assert cf.getSortedColumns().size() == 1;
+                sc = (SuperColumn)cf.getSortedColumns().iterator().next();
+            }
+            else
+            {
+                cf = getCachedRow(filter);
+                sc = (SuperColumn)cf.getColumn(filter.path.superColumnName);
+            }
+            
             SuperColumn scFiltered = filter.filterSuperColumn(sc, gcBefore);
             ColumnFamily cfFiltered = cf.cloneMeShallow();
             cfFiltered.addColumn(scFiltered);
@@ -1222,6 +1258,11 @@
         return ssTables_.size();
     }
 
+    public void invalidate(String key)
+    {
+        rowCache.remove(key);
+    }
+
     /**
      * for testing.  no effort is made to clear historical memtables.
      */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Jan 12 04:03:19 2010
@@ -199,9 +199,8 @@
     /**
      * obtain an iterator of columns in this memtable in the specified order starting from a given column.
      */
-    public ColumnIterator getSliceIterator(SliceQueryFilter filter, AbstractType typeComparator)
+    public ColumnIterator getSliceIterator(ColumnFamily cf, SliceQueryFilter filter, AbstractType typeComparator)
     {
-        ColumnFamily cf = columnFamilies.get(partitioner.decorateKey(filter.key));
         final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table, filter.getColumnFamilyName()) : cf.cloneMeShallow();
 
         final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new IColumn[columnFamily.getSortedColumns().size()]);
@@ -252,9 +251,8 @@
         };
     }
 
-    public ColumnIterator getNamesIterator(final NamesQueryFilter filter)
+    public ColumnIterator getNamesIterator(final ColumnFamily cf, final NamesQueryFilter filter)
     {
-        final ColumnFamily cf = columnFamilies.get(partitioner.decorateKey(filter.key));
         final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table, filter.getColumnFamilyName()) : cf.cloneMeShallow();
         final boolean isStandard = DatabaseDescriptor.getColumnFamilyType(table, filter.getColumnFamilyName()).equals("Standard");
 
@@ -287,6 +285,11 @@
         };
     }
 
+    public ColumnFamily getColumnFamily(String key)
+    {
+        return columnFamilies.get(partitioner.decorateKey(key));
+    }
+
     void clearUnsafe()
     {
         columnFamilies.clear();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Jan 12 04:03:19 2010
@@ -415,6 +415,7 @@
     {
         HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2);
 
+        // write the mutation to the commitlog and memtables
         flusherLock.readLock().lock();
         try
         {
@@ -434,7 +435,14 @@
             flusherLock.readLock().unlock();
         }
 
-        // usually mTF will be empty and this will be a no-op
+        // invalidate cache.  2nd loop over CFs here to avoid prolonging the lock section unnecessarily.
+        for (ColumnFamily cf : mutation.getColumnFamilies())
+        {
+            ColumnFamilyStore cfs = columnFamilyStores.get(cf.name());
+            cfs.invalidate(mutation.key());
+        }
+
+        // flush memtables that got filled up.  usually mTF will be empty and this will be a no-op
         for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet())
             entry.getKey().switchMemtable(entry.getValue(), writeCommitLog);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java Tue Jan 12 04:03:19 2010
@@ -59,9 +59,9 @@
         return set;
     }
 
-    public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
+    public ColumnIterator getMemColumnIterator(Memtable memtable, ColumnFamily cf, AbstractType comparator)
     {
-        return memtable.getNamesIterator(this);
+        return memtable.getNamesIterator(cf, this);
     }
 
     public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Tue Jan 12 04:03:19 2010
@@ -46,7 +46,12 @@
      * returns an iterator that returns columns from the given memtable
      * matching the Filter criteria in sorted order.
      */
-    public abstract ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator);
+    public abstract ColumnIterator getMemColumnIterator(Memtable memtable, ColumnFamily cf, AbstractType comparator);
+
+    public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
+    {
+        return getMemColumnIterator(memtable, memtable.getColumnFamily(key), comparator);
+    }
 
     /**
      * returns an iterator that returns columns from the given SSTable

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Tue Jan 12 04:03:19 2010
@@ -53,9 +53,9 @@
         this.count = count;
     }
 
-    public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
+    public ColumnIterator getMemColumnIterator(Memtable memtable, ColumnFamily cf, AbstractType comparator)
     {
-        return memtable.getSliceIterator(this, comparator);
+        return memtable.getSliceIterator(cf, this, comparator);
     }
 
     public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=898175&r1=898174&r2=898175&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Tue Jan 12 04:03:19 2010
@@ -117,6 +117,17 @@
         return count;
     }
 
+    public static int estimatedKeys(String columnFamilyName)
+    {
+        int n = 0;
+        for (SSTableReader sstable : openedFiles.values())
+        {
+            if (sstable.getColumnFamilyName().equals(columnFamilyName))
+                n += sstable.getIndexPositions().size() * INDEX_INTERVAL;
+        }
+        return n;
+    }
+
     /**
      * Get all indexed keys defined by the two predicates.
      * @param cfpred A Predicate defining matching column families.