You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/18 12:59:39 UTC

svn commit: r1094484 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/s...

Author: slebresne
Date: Mon Apr 18 10:59:39 2011
New Revision: 1094484

URL: http://svn.apache.org/viewvc?rev=1094484&view=rev
Log:
Merge CASSANDRA-2420 from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
 /cassandra/branches/cassandra-0.7:1026516-1091087,1091503,1091542,1091654
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090935-1094085
+/cassandra/branches/cassandra-0.8:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/contrib:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090935-1094085
+/cassandra/branches/cassandra-0.8/contrib:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/contrib:810145-810987,810994-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1094085
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1094085
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1094085
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1094085
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 18 10:59:39 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1091087,1091503,1091542,1091654
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1091087,1091503,1091542,1091654,1094481
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1094085
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1094085,1094481
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094484&r1=1094483&r2=1094484&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 10:59:39 2011
@@ -735,6 +735,20 @@ public class ColumnFamilyStore implement
         submitFlush(binaryMemtable.get(), new CountDownLatch(1));
     }
 
+    public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
+    {
+        if (rowCache.isPutCopying())
+        {
+            invalidateCachedRow(key);
+        }
+        else
+        {
+            ColumnFamily cachedRow = getRawCachedRow(key);
+            if (cachedRow != null)
+                cachedRow.addAll(columnFamily);
+        }
+    }
+
     /**
      * Insert/Update the column family for this key.
      * Caller is responsible for acquiring Table.flusherLock!
@@ -749,17 +763,8 @@ public class ColumnFamilyStore implement
         Memtable mt = getMemtableThreadSafe();
         boolean flushRequested = mt.isThresholdViolated();
         mt.put(key, columnFamily);
-        if (rowCache.isPutCopying())
-        {
-            invalidateCachedRow(key);
-        }
-        else
-        {
-            ColumnFamily cachedRow = getRawCachedRow(key);
-            if (cachedRow != null)
-                cachedRow.addAll(columnFamily);
-            writeStats.addNano(System.nanoTime() - start);
-        }
+        updateRowCache(key, columnFamily);
+        writeStats.addNano(System.nanoTime() - start);
 
         if (DatabaseDescriptor.estimatesRealMemtableSize())
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1094484&r1=1094483&r2=1094484&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon Apr 18 10:59:39 2011
@@ -131,4 +131,15 @@ public class PrecompactedRow extends Abs
     {
         return compactedCf == null ? 0 : compactedCf.getColumnCount();
     }
+
+    /**
+     * @return the full column family represented by this compacted row.
+     *
+     * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
+     * memory and don't make sense for those other implementations.
+     */
+    public ColumnFamily getFullColumnFamily()  throws IOException
+    {
+        return compactedCf;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1094484&r1=1094483&r2=1094484&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 18 10:59:39 2011
@@ -285,9 +285,9 @@ public class SSTableWriter extends SSTab
             try
             {
                 if (cfs.metadata.getDefaultValidator().isCommutative())
-                    indexer = new CommutativeRowIndexer(desc, cfs.metadata);
+                    indexer = new CommutativeRowIndexer(desc, cfs, type);
                 else
-                    indexer = new RowIndexer(desc, cfs.metadata);
+                    indexer = new RowIndexer(desc, cfs, type);
             }
             catch (IOException e)
             {
@@ -320,20 +320,22 @@ public class SSTableWriter extends SSTab
     {
         protected final Descriptor desc;
         public final BufferedRandomAccessFile dfile;
+        private final OperationType type;
 
         protected IndexWriter iwriter;
-        protected CFMetaData metadata;
+        protected ColumnFamilyStore cfs;
 
-        RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+        RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+            this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
         }
 
-        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException
+        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
             this.desc = desc;
             this.dfile = dfile;
-            this.metadata = metadata;
+            this.type = type;
+            this.cfs = cfs;
         }
 
         long prepareIndexing() throws IOException
@@ -377,6 +379,53 @@ public class SSTableWriter extends SSTab
             iwriter.close();
         }
 
+        /*
+         * If the key is cached, we should:
+         *   - For AES: run the newly received row by the cache
+         *   - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and
+         *     then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key
+         *     would be obsolete and so we must invalidate the cache).
+         */
+        protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException
+        {
+            ColumnFamily cached = cfs.getRawCachedRow(key);
+            if (cached != null)
+            {
+                switch (type)
+                {
+                    case AES:
+                        if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                        {
+                            // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+                            // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+                            logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                            cfs.invalidateCachedRow(key);
+                        }
+                        else
+                        {
+                            ColumnFamily cf;
+                            if (row == null)
+                            {
+                                // If not provided, read from disk.
+                                cf = ColumnFamily.create(cfs.metadata);
+                                ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true);
+                            }
+                            else
+                            {
+                                assert row instanceof PrecompactedRow;
+                                // we do not purge so we should not get a null here
+                                cf = ((PrecompactedRow)row).getFullColumnFamily();
+                            }
+                            cfs.updateRowCache(key, cf);
+                        }
+                        break;
+                    default:
+                        cfs.invalidateCachedRow(key);
+                        break;
+                }
+            }
+        }
+
         protected long doIndexing() throws IOException
         {
             EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
@@ -393,10 +442,14 @@ public class SSTableWriter extends SSTab
                 // seek to next key
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
                 rowPosition = dfile.getFilePointer() + dataSize;
-                
+
                 IndexHelper.skipBloomFilter(dfile);
                 IndexHelper.skipIndex(dfile);
-                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile);
+                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
+
+                // don't move that statement around, it expects the dfile to be before the columns
+                updateCache(key, dataSize, null);
+
                 rowSizes.add(dataSize);
                 columnCounts.add(dfile.readInt());
                 
@@ -424,9 +477,9 @@ public class SSTableWriter extends SSTab
     {
         protected BufferedRandomAccessFile writerDfile;
 
-        CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+        CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+            super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
             writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
         }
 
@@ -448,7 +501,7 @@ public class SSTableWriter extends SSTab
 
                 // skip data size, bloom filter, column index
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
-                SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
+                SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
 
                 AbstractCompactedRow row;
                 if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
@@ -461,6 +514,8 @@ public class SSTableWriter extends SSTab
                     row = new PrecompactedRow(controller, Collections.singletonList(iter));
                 }
 
+                updateCache(key, dataSize, row);
+
                 rowSizes.add(dataSize);
                 columnCounts.add(row.columnCount());
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1094484&r1=1094483&r2=1094484&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java Mon Apr 18 10:59:39 2011
@@ -23,8 +23,6 @@ package org.apache.cassandra.streaming;
  */
 public enum OperationType
 {
-    // TODO: the only types of operation that are currently distinguised are AES and everything else.  There is no
-    // sense in having the other types (yet).
     AES,
     BOOTSTRAP,
     UNBOOTSTRAP,