You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/22 15:53:02 UTC

svn commit: r1073352 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/db/CompactionManager.java

Author: jbellis
Date: Tue Feb 22 14:53:01 2011
New Revision: 1073352

URL: http://svn.apache.org/viewvc?rev=1073352&view=rev
Log:
fix for cleanup writing old-format data into new-version sstable
patch by jbellis; reviewed by slebresne for CASSANDRA-2211

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073352&r1=1073351&r2=1073352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb 22 14:53:01 2011
@@ -12,6 +12,8 @@
  * validate index names for \w+ (CASSANDRA-2196)
  * Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
  * update memtable_throughput to be a long (CASSANDRA-2158)
+ * fix for cleanup writing old-format data into new-version sstable 
+   (CASSANRDRA-2211)
 
 
 0.7.2

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073352&r1=1073351&r2=1073352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Tue Feb 22 14:53:01 2011
@@ -41,9 +41,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.AbstractCompactedRow;
-import org.apache.cassandra.io.CompactionIterator;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.AntiEntropyService;
@@ -119,7 +117,7 @@ public class CompactionManager implement
                             Collections.sort(sstables);
                             int gcBefore = cfs.isIndex()
                                          ? Integer.MAX_VALUE
-                                         : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+                                         : getDefaultGcBefore(cfs);
                             return doCompaction(cfs,
                                                 sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
                                                 gcBefore);
@@ -183,7 +181,7 @@ public class CompactionManager implement
 
     public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
-        submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.getGcGraceSeconds()).get();
+        submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
     }
 
     public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
@@ -256,7 +254,7 @@ public class CompactionManager implement
         }
 
         ColumnFamilyStore cfs = Table.open(ksname).getColumnFamilyStore(cfname);
-        submitUserDefined(cfs, descriptors, (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds());
+        submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
     }
 
     private Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
@@ -515,7 +513,7 @@ public class CompactionManager implement
                     if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
                         writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
-                        writer.append(new EchoedRow(row));
+                        writer.append(getCompactedRow(row, cfs, sstable.descriptor));
                         totalkeysWritten++;
                     }
                     else
@@ -568,6 +566,21 @@ public class CompactionManager implement
         }
     }
 
+    /**
+     * @return an AbstractCompactedRow implementation to write the row in question.
+     * If the data is from a current-version sstable, write it unchanged.  Otherwise,
+     * re-serialize it in the latest version.
+     */
+    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor)
+    {
+        if (descriptor.isLatestVersion)
+            return new EchoedRow(row);
+
+        return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
+               ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs))
+               : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs));
+    }
+
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
             throws IOException
     {
@@ -752,11 +765,16 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
+    private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+    {
+        return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+    }
+
     private static class ValidationCompactionIterator extends CompactionIterator
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
         {
-            super(cfs, cfs.getSSTables(), (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(), true);
+            super(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true);
         }
 
         @Override