You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/22 15:53:02 UTC
svn commit: r1073352 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
src/java/org/apache/cassandra/db/CompactionManager.java
Author: jbellis
Date: Tue Feb 22 14:53:01 2011
New Revision: 1073352
URL: http://svn.apache.org/viewvc?rev=1073352&view=rev
Log:
fix for cleanup writing old-format data into new-version sstable
patch by jbellis; reviewed by slebresne for CASSANDRA-2211
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073352&r1=1073351&r2=1073352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb 22 14:53:01 2011
@@ -12,6 +12,8 @@
* validate index names for \w+ (CASSANDRA-2196)
* Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
* update memtable_throughput to be a long (CASSANDRA-2158)
+ * fix for cleanup writing old-format data into new-version sstable
+ (CASSANRDRA-2211)
0.7.2
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073352&r1=1073351&r2=1073352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Tue Feb 22 14:53:01 2011
@@ -41,9 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.AbstractCompactedRow;
-import org.apache.cassandra.io.CompactionIterator;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
@@ -119,7 +117,7 @@ public class CompactionManager implement
Collections.sort(sstables);
int gcBefore = cfs.isIndex()
? Integer.MAX_VALUE
- : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ : getDefaultGcBefore(cfs);
return doCompaction(cfs,
sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
gcBefore);
@@ -183,7 +181,7 @@ public class CompactionManager implement
public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
- submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.getGcGraceSeconds()).get();
+ submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
}
public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
@@ -256,7 +254,7 @@ public class CompactionManager implement
}
ColumnFamilyStore cfs = Table.open(ksname).getColumnFamilyStore(cfname);
- submitUserDefined(cfs, descriptors, (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds());
+ submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
}
private Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
@@ -515,7 +513,7 @@ public class CompactionManager implement
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
- writer.append(new EchoedRow(row));
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor));
totalkeysWritten++;
}
else
@@ -568,6 +566,21 @@ public class CompactionManager implement
}
}
+ /**
+ * @return an AbstractCompactedRow implementation to write the row in question.
+ * If the data is from a current-version sstable, write it unchanged. Otherwise,
+ * re-serialize it in the latest version.
+ */
+ private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor)
+ {
+ if (descriptor.isLatestVersion)
+ return new EchoedRow(row);
+
+ return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
+ ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs))
+ : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs));
+ }
+
private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
throws IOException
{
@@ -752,11 +765,16 @@ public class CompactionManager implement
return executor.submit(runnable);
}
+ private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+ {
+ return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ }
+
private static class ValidationCompactionIterator extends CompactionIterator
{
public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
{
- super(cfs, cfs.getSSTables(), (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(), true);
+ super(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true);
}
@Override