You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/18 12:46:26 UTC
svn commit: r1094481 - in
/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra:
db/ColumnFamilyStore.java io/PrecompactedRow.java
io/sstable/SSTableWriter.java streaming/OperationType.java
Author: slebresne
Date: Mon Apr 18 10:46:26 2011
New Revision: 1094481
URL: http://svn.apache.org/viewvc?rev=1094481&view=rev
Log:
Update row cache post streaming
patch by slebresne; reviewed by jbellis for CASSANDRA-2420
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 10:46:26 2011
@@ -735,6 +735,20 @@ public class ColumnFamilyStore implement
submitFlush(binaryMemtable.get(), new CountDownLatch(1));
}
+ public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
+ {
+ if (rowCache.isPutCopying())
+ {
+ invalidateCachedRow(key);
+ }
+ else
+ {
+ ColumnFamily cachedRow = getRawCachedRow(key);
+ if (cachedRow != null)
+ cachedRow.addAll(columnFamily);
+ }
+ }
+
/**
* Insert/Update the column family for this key.
* Caller is responsible for acquiring Table.flusherLock!
@@ -749,17 +763,8 @@ public class ColumnFamilyStore implement
Memtable mt = getMemtableThreadSafe();
boolean flushRequested = mt.isThresholdViolated();
mt.put(key, columnFamily);
- if (rowCache.isPutCopying())
- {
- invalidateCachedRow(key);
- }
- else
- {
- ColumnFamily cachedRow = getRawCachedRow(key);
- if (cachedRow != null)
- cachedRow.addAll(columnFamily);
- writeStats.addNano(System.nanoTime() - start);
- }
+ updateRowCache(key, columnFamily);
+ writeStats.addNano(System.nanoTime() - start);
if (DatabaseDescriptor.estimatesRealMemtableSize())
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon Apr 18 10:46:26 2011
@@ -131,4 +131,15 @@ public class PrecompactedRow extends Abs
{
return compactedCf == null ? 0 : compactedCf.getColumnCount();
}
+
+ /**
+ * @return the full column family represented by this compacted row.
+ *
+ * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
+ * memory and don't make sense for those other implementations.
+ */
+ public ColumnFamily getFullColumnFamily() throws IOException
+ {
+ return compactedCf;
+ }
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 18 10:46:26 2011
@@ -285,9 +285,9 @@ public class SSTableWriter extends SSTab
try
{
if (cfs.metadata.getDefaultValidator().isCommutative())
- indexer = new CommutativeRowIndexer(desc, cfs.metadata);
+ indexer = new CommutativeRowIndexer(desc, cfs, type);
else
- indexer = new RowIndexer(desc, cfs.metadata);
+ indexer = new RowIndexer(desc, cfs, type);
}
catch (IOException e)
{
@@ -320,20 +320,22 @@ public class SSTableWriter extends SSTab
{
protected final Descriptor desc;
public final BufferedRandomAccessFile dfile;
+ private final OperationType type;
protected IndexWriter iwriter;
- protected CFMetaData metadata;
+ protected ColumnFamilyStore cfs;
- RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+ RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
{
- this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+ this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
}
- protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException
+ protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
{
this.desc = desc;
this.dfile = dfile;
- this.metadata = metadata;
+ this.type = type;
+ this.cfs = cfs;
}
long prepareIndexing() throws IOException
@@ -377,6 +379,53 @@ public class SSTableWriter extends SSTab
iwriter.close();
}
+ /*
+ * If the key is cached, we should:
+ * - For AES: run the newly received row by the cache
+ * - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and
+ * then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key
+ * would be obsolete and so we must invalidate the cache).
+ */
+ protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException
+ {
+ ColumnFamily cached = cfs.getRawCachedRow(key);
+ if (cached != null)
+ {
+ switch (type)
+ {
+ case AES:
+ if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+ {
+ // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+ // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+ logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+ cfs.invalidateCachedRow(key);
+ }
+ else
+ {
+ ColumnFamily cf;
+ if (row == null)
+ {
+ // If not provided, read from disk.
+ cf = ColumnFamily.create(cfs.metadata);
+ ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true);
+ }
+ else
+ {
+ assert row instanceof PrecompactedRow;
+ // we do not purge so we should not get a null here
+ cf = ((PrecompactedRow)row).getFullColumnFamily();
+ }
+ cfs.updateRowCache(key, cf);
+ }
+ break;
+ default:
+ cfs.invalidateCachedRow(key);
+ break;
+ }
+ }
+ }
+
protected long doIndexing() throws IOException
{
EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
@@ -393,10 +442,14 @@ public class SSTableWriter extends SSTab
// seek to next key
long dataSize = SSTableReader.readRowSize(dfile, desc);
rowPosition = dfile.getFilePointer() + dataSize;
-
+
IndexHelper.skipBloomFilter(dfile);
IndexHelper.skipIndex(dfile);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
+
+ // don't move that statement around, it expects the dfile to be before the columns
+ updateCache(key, dataSize, null);
+
rowSizes.add(dataSize);
columnCounts.add(dfile.readInt());
@@ -424,9 +477,9 @@ public class SSTableWriter extends SSTab
{
protected BufferedRandomAccessFile writerDfile;
- CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+ CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
{
- super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
+ super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
}
@@ -448,7 +501,7 @@ public class SSTableWriter extends SSTab
// skip data size, bloom filter, column index
long dataSize = SSTableReader.readRowSize(dfile, desc);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
AbstractCompactedRow row;
if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
@@ -461,6 +514,8 @@ public class SSTableWriter extends SSTab
row = new PrecompactedRow(controller, Collections.singletonList(iter));
}
+ updateCache(key, dataSize, row);
+
rowSizes.add(dataSize);
columnCounts.add(row.columnCount());
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1094481&r1=1094480&r2=1094481&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java Mon Apr 18 10:46:26 2011
@@ -23,8 +23,6 @@ package org.apache.cassandra.streaming;
*/
public enum OperationType
{
- // TODO: the only types of operation that are currently distinguised are AES and everything else. There is no
- // sense in having the other types (yet).
AES,
BOOTSTRAP,
UNBOOTSTRAP,