You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/18 12:46:26 UTC

svn commit: r1094481 - in /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra: db/ColumnFamilyStore.java io/PrecompactedRow.java io/sstable/SSTableWriter.java streaming/OperationType.java

Author: slebresne
Date: Mon Apr 18 10:46:26 2011
New Revision: 1094481

URL: http://svn.apache.org/viewvc?rev=1094481&view=rev
Log:
Update row cache post streaming
patch by slebresne; reviewed by jbellis for CASSANDRA-2420

Modified:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 10:46:26 2011
@@ -735,6 +735,20 @@ public class ColumnFamilyStore implement
         submitFlush(binaryMemtable.get(), new CountDownLatch(1));
     }
 
+    public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
+    {
+        if (rowCache.isPutCopying())
+        {
+            invalidateCachedRow(key);
+        }
+        else
+        {
+            ColumnFamily cachedRow = getRawCachedRow(key);
+            if (cachedRow != null)
+                cachedRow.addAll(columnFamily);
+        }
+    }
+
     /**
      * Insert/Update the column family for this key.
      * Caller is responsible for acquiring Table.flusherLock!
@@ -749,17 +763,8 @@ public class ColumnFamilyStore implement
         Memtable mt = getMemtableThreadSafe();
         boolean flushRequested = mt.isThresholdViolated();
         mt.put(key, columnFamily);
-        if (rowCache.isPutCopying())
-        {
-            invalidateCachedRow(key);
-        }
-        else
-        {
-            ColumnFamily cachedRow = getRawCachedRow(key);
-            if (cachedRow != null)
-                cachedRow.addAll(columnFamily);
-            writeStats.addNano(System.nanoTime() - start);
-        }
+        updateRowCache(key, columnFamily);
+        writeStats.addNano(System.nanoTime() - start);
 
         if (DatabaseDescriptor.estimatesRealMemtableSize())
         {

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon Apr 18 10:46:26 2011
@@ -131,4 +131,15 @@ public class PrecompactedRow extends Abs
     {
         return compactedCf == null ? 0 : compactedCf.getColumnCount();
     }
+
+    /**
+     * @return the full column family represented by this compacted row.
+     *
+     * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
+     * memory and don't make sense for those other implementations.
+     */
+    public ColumnFamily getFullColumnFamily()  throws IOException
+    {
+        return compactedCf;
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 18 10:46:26 2011
@@ -285,9 +285,9 @@ public class SSTableWriter extends SSTab
             try
             {
                 if (cfs.metadata.getDefaultValidator().isCommutative())
-                    indexer = new CommutativeRowIndexer(desc, cfs.metadata);
+                    indexer = new CommutativeRowIndexer(desc, cfs, type);
                 else
-                    indexer = new RowIndexer(desc, cfs.metadata);
+                    indexer = new RowIndexer(desc, cfs, type);
             }
             catch (IOException e)
             {
@@ -320,20 +320,22 @@ public class SSTableWriter extends SSTab
     {
         protected final Descriptor desc;
         public final BufferedRandomAccessFile dfile;
+        private final OperationType type;
 
         protected IndexWriter iwriter;
-        protected CFMetaData metadata;
+        protected ColumnFamilyStore cfs;
 
-        RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+        RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+            this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
         }
 
-        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException
+        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
             this.desc = desc;
             this.dfile = dfile;
-            this.metadata = metadata;
+            this.type = type;
+            this.cfs = cfs;
         }
 
         long prepareIndexing() throws IOException
@@ -377,6 +379,53 @@ public class SSTableWriter extends SSTab
             iwriter.close();
         }
 
+        /*
+         * If the key is cached, we should:
+         *   - For AES: run the newly received row by the cache
+         *   - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and
+         *     then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key
+         *     would be obsolete and so we must invalidate the cache).
+         */
+        protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException
+        {
+            ColumnFamily cached = cfs.getRawCachedRow(key);
+            if (cached != null)
+            {
+                switch (type)
+                {
+                    case AES:
+                        if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                        {
+                            // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+                            // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+                            logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                            cfs.invalidateCachedRow(key);
+                        }
+                        else
+                        {
+                            ColumnFamily cf;
+                            if (row == null)
+                            {
+                                // If not provided, read from disk.
+                                cf = ColumnFamily.create(cfs.metadata);
+                                ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true);
+                            }
+                            else
+                            {
+                                assert row instanceof PrecompactedRow;
+                                // we do not purge so we should not get a null here
+                                cf = ((PrecompactedRow)row).getFullColumnFamily();
+                            }
+                            cfs.updateRowCache(key, cf);
+                        }
+                        break;
+                    default:
+                        cfs.invalidateCachedRow(key);
+                        break;
+                }
+            }
+        }
+
         protected long doIndexing() throws IOException
         {
             EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
@@ -393,10 +442,14 @@ public class SSTableWriter extends SSTab
                 // seek to next key
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
                 rowPosition = dfile.getFilePointer() + dataSize;
-                
+
                 IndexHelper.skipBloomFilter(dfile);
                 IndexHelper.skipIndex(dfile);
-                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile);
+                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
+
+                // don't move that statement around, it expects the dfile to be before the columns
+                updateCache(key, dataSize, null);
+
                 rowSizes.add(dataSize);
                 columnCounts.add(dfile.readInt());
                 
@@ -424,9 +477,9 @@ public class SSTableWriter extends SSTab
     {
         protected BufferedRandomAccessFile writerDfile;
 
-        CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+        CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+            super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
             writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
         }
 
@@ -448,7 +501,7 @@ public class SSTableWriter extends SSTab
 
                 // skip data size, bloom filter, column index
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
-                SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
+                SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
 
                 AbstractCompactedRow row;
                 if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
@@ -461,6 +514,8 @@ public class SSTableWriter extends SSTab
                     row = new PrecompactedRow(controller, Collections.singletonList(iter));
                 }
 
+                updateCache(key, dataSize, row);
+
                 rowSizes.add(dataSize);
                 columnCounts.add(row.columnCount());
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java Mon Apr 18 10:46:26 2011
@@ -23,8 +23,6 @@ package org.apache.cassandra.streaming;
  */
 public enum OperationType
 {
-    // TODO: the only types of operation that are currently distinguised are AES and everything else.  There is no
-    // sense in having the other types (yet).
     AES,
     BOOTSTRAP,
     UNBOOTSTRAP,