You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/23 04:44:26 UTC

svn commit: r1073586 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassandra/service/ ...

Author: jbellis
Date: Wed Feb 23 03:44:26 2011
New Revision: 1073586

URL: http://svn.apache.org/viewvc?rev=1073586&view=rev
Log:
nodetool scrub
patch by jbellis and tjake; reviewed by slebresne for CASSANDRA-2217

Added:
    cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/
    cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db   (with props)
    cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db   (with props)
    cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db   (with props)
    cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db   (with props)
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/build.xml
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
    cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Feb 23 03:44:26 2011
@@ -15,6 +15,7 @@
  * update memtable_throughput to be a long (CASSANDRA-2158)
  * fix for compaction and cleanup writing old-format data into new-version 
    sstable (CASSANDRA-2211, -2216)
+ * add nodetool scrub (CASSANDRA-2217)
  * fix sstable2json large-row pagination (CASSANDRA-2188)
  * fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
  * fix BRAF performance when seeking to EOF (CASSANDRA-2218)

Modified: cassandra/branches/cassandra-0.7/build.xml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/build.xml?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/build.xml (original)
+++ cassandra/branches/cassandra-0.7/build.xml Wed Feb 23 03:44:26 2011
@@ -561,6 +561,7 @@
   <target name="test" depends="build-test" description="Execute unit tests">
     <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+      <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
     </testmacro>
   </target>
     

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 23 03:44:26 2011
@@ -928,6 +928,12 @@ public class ColumnFamilyStore implement
         CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }
 
+    public void scrub() throws ExecutionException, InterruptedException
+    {
+        snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
+        CompactionManager.instance.performScrub(ColumnFamilyStore.this);
+    }
+
     void markCompacted(Collection<SSTableReader> sstables)
     {
         ssTables.markCompacted(sstables);
@@ -1601,26 +1607,8 @@ public class ColumnFamilyStore implement
         return metadata.comparator;
     }
 
-    /**
-     * Take a snap shot of this columnfamily store.
-     * 
-     * @param snapshotName the name of the associated with the snapshot 
-     */
-    public void snapshot(String snapshotName)
+    private void snapshotWithoutFlush(String snapshotName)
     {
-        try
-        {
-            forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
         for (SSTableReader ssTable : ssTables)
         {
             try
@@ -1643,6 +1631,30 @@ public class ColumnFamilyStore implement
         }
     }
 
+
+    /**
+     * Take a snap shot of this columnfamily store.
+     * 
+     * @param snapshotName the name of the associated with the snapshot 
+     */
+    public void snapshot(String snapshotName)
+    {
+        try
+        {
+            forceBlockingFlush();
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        snapshotWithoutFlush(snapshotName);
+    }
+
     public boolean hasUnreclaimedSpace()
     {
         return ssTables.getLiveSize() < ssTables.getTotalSize();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Wed Feb 23 03:44:26 2011
@@ -43,9 +43,11 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -179,6 +181,28 @@ public class CompactionManager implement
         executor.submit(runnable).get();
     }
 
+    public void performScrub(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+    {
+        Callable<Object> runnable = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                compactionLock.lock();
+                try
+                {
+                    if (!cfStore.isInvalid())
+                        doScrub(cfStore);
+                    return this;
+                }
+                finally
+                {
+                    compactionLock.unlock();
+                }
+            }
+        };
+        executor.submit(runnable).get();
+    }
+
     public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
         submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
@@ -473,6 +497,101 @@ public class CompactionManager implement
     }
 
     /**
+     * Deserialize everything in the CFS and re-serialize w/ the newest version.  Also attempts to recover
+     * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
+     * from early ByteBuffer bugs.
+     *
+     * @throws IOException
+     */
+    private void doScrub(ColumnFamilyStore cfs) throws IOException
+    {
+        assert !cfs.isIndex();
+        Table table = cfs.table;
+        Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
+
+        for (final SSTableReader sstable : cfs.getSSTables())
+        {
+            logger.info("Scrubbing " + sstable);
+
+            // Calculate the expected compacted filesize
+            String compactionFileLocation = table.getDataFileLocation(sstable.length());
+            if (compactionFileLocation == null)
+                throw new IOException("disk full");
+
+            int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
+                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+            if (logger.isDebugEnabled())
+              logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+            // loop through each row, deserializing to check for damage.
+            // we'll also loop through the index at the same time, using the position from the index to recover if the
+            // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+            // "ahead" of the data file.)
+            final BufferedRandomAccessFile dataFile = BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+            String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
+            BufferedRandomAccessFile indexFile = BufferedRandomAccessFile.getUncachingReader(indexFilename);
+            ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+            assert indexFile.readLong() == 0;
+
+            SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
+            executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+
+            while (!dataFile.isEOF())
+            {
+                long rowStart = dataFile.getFilePointer();
+                DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                ByteBuffer currentIndexKey = nextIndexKey;
+                nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+                long nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+
+                long dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                long dataStart = dataFile.getFilePointer();
+
+                SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                writer.mark();
+                try
+                {
+                    writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+                }
+                catch (Exception e)
+                {
+                    logger.warn("Error reading row " + ByteBufferUtil.bytesToHex(key.key) + "(stacktrace follows)", e);
+                    writer.reset();
+                    
+                    long dataStartFromIndex = rowStart + 2 + currentIndexKey.remaining();
+                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                    {
+                        logger.info(String.format("Retrying %s as key %s from row index",
+                                                  ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)));
+                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+                        long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                        row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                        try
+                        {
+                            writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+                        }
+                        catch (Exception e2)
+                        {
+                            logger.info("Retry failed too.  Skipping to next row (retry's stacktrace follows)", e2);
+                            writer.reset();
+                            dataFile.seek(nextRowPositionFromIndex);
+                        }
+                    }
+                    else
+                    {
+                        logger.info("Skipping to next row");
+                        dataFile.seek(nextRowPositionFromIndex);
+                    }
+                }
+            }
+
+            SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+            logger.info("Scrub of " + sstable + " complete");
+        }
+    }
+
+    /**
      * This function goes over each file and removes the keys that the node is not responsible for
      * and only keeps keys that this node is responsible for.
      *
@@ -497,7 +616,7 @@ public class CompactionManager implement
             long totalkeysWritten = 0;
 
             int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
-                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
             if (logger.isDebugEnabled())
               logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
@@ -513,7 +632,7 @@ public class CompactionManager implement
                     if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
                         writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
-                        writer.append(getCompactedRow(row, cfs, sstable.descriptor));
+                        writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
                         totalkeysWritten++;
                     }
                     else
@@ -571,14 +690,14 @@ public class CompactionManager implement
      * If the data is from a current-version sstable, write it unchanged.  Otherwise,
      * re-serialize it in the latest version.
      */
-    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor)
+    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor, boolean forceDeserialize)
     {
-        if (descriptor.isLatestVersion)
+        if (descriptor.isLatestVersion && !forceDeserialize)
             return new EchoedRow(row);
 
         return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
-               ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs))
-               : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs));
+               ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize)
+               : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize);
     }
 
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
@@ -983,4 +1102,38 @@ public class CompactionManager implement
             return "Cleanup of " + sstable.getColumnFamilyName();
         }
     }
+
+    private static class ScrubInfo implements ICompactionInfo
+    {
+        private final BufferedRandomAccessFile dataFile;
+        private final SSTableReader sstable;
+
+        public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
+        {
+            this.dataFile = dataFile;
+            this.sstable = sstable;
+        }
+
+        public long getTotalBytes()
+        {
+            try
+            {
+                return dataFile.length();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public long getBytesComplete()
+        {
+            return dataFile.getFilePointer();
+        }
+
+        public String getTaskType()
+        {
+            return "Scrub " + sstable;
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Feb 23 03:44:26 2011
@@ -134,9 +134,9 @@ implements Closeable, ICompactionInfo
         {
             logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
                                       ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
-            return new LazilyCompactedRow(cfs, rows, major, gcBefore);
+            return new LazilyCompactedRow(cfs, rows, major, gcBefore, false);
         }
-        return new PrecompactedRow(cfs, rows, major, gcBefore);
+        return new PrecompactedRow(cfs, rows, major, gcBefore, false);
     }
 
     public void close() throws IOException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Feb 23 03:44:26 2011
@@ -59,15 +59,17 @@ public class LazilyCompactedRow extends 
     private final boolean shouldPurge;
     private final int gcBefore;
     private final DataOutputBuffer headerBuffer;
+    private final boolean forceDeserialize;
     private ColumnFamily emptyColumnFamily;
     private LazyColumnIterator iter;
     private int columnCount;
     private long columnSerializedSize;
 
-    public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+    public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
     {
         super(rows.get(0).getKey());
         this.gcBefore = gcBefore;
+        this.forceDeserialize = forceDeserialize;
         this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
         Set<SSTable> sstables = new HashSet<SSTable>();
@@ -94,7 +96,7 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion)
+        if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion && !forceDeserialize)
         {
             SSTableIdentityIterator row = rows.get(0);
             out.writeLong(row.dataSize);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Feb 23 03:44:26 2011
@@ -55,7 +55,7 @@ public class PrecompactedRow extends Abs
         this.buffer = buffer;
     }
 
-    public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+    public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
     {
         super(rows.get(0).getKey());
         buffer = new DataOutputBuffer();
@@ -67,7 +67,7 @@ public class PrecompactedRow extends Abs
         }
         boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, sstables);
 
-        if (rows.size() > 1 || shouldPurge || !rows.get(0).sstable.descriptor.isLatestVersion)
+        if (rows.size() > 1 || shouldPurge || !rows.get(0).sstable.descriptor.isLatestVersion || forceDeserialize)
         {
             ColumnFamily cf = null;
             for (SSTableIdentityIterator row : rows)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Feb 23 03:44:26 2011
@@ -24,15 +24,22 @@ package org.apache.cassandra.io.sstable;
 import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.utils.Filter;
 
 public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
 {
+    private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
+
     private final DecoratedKey key;
     private final long finishedAt;
     private final BufferedRandomAccessFile file;
@@ -56,6 +63,12 @@ public class SSTableIdentityIterator imp
     public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize)
     throws IOException
     {
+        this(sstable, file, key, dataStart, dataSize, false);
+    }
+
+    public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean deserializeRowHeader)
+    throws IOException
+    {
         this.sstable = sstable;
         this.file = file;
         this.key = key;
@@ -66,6 +79,28 @@ public class SSTableIdentityIterator imp
         try
         {
             file.seek(this.dataStart);
+            if (deserializeRowHeader)
+            {
+                try
+                {
+                    IndexHelper.defreezeBloomFilter(file, sstable.descriptor.usesOldBloomFilter);
+                }
+                catch (Exception e)
+                {
+                    logger.info("Invalid bloom filter in " + sstable + "; will rebuild it");
+                    // deFreeze should have left the file position ready to deserialize index
+                }
+                try
+                {
+                    IndexHelper.deserializeIndex(file);
+                }
+                catch (Exception e)
+                {
+                    logger.info("Invalid row summary in " + sstable + "; will rebuild it");
+                }
+                file.seek(this.dataStart);
+            }
+
             IndexHelper.skipBloomFilter(file);
             IndexHelper.skipIndex(file);
             columnFamily = sstable.createColumnFamily();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Wed Feb 23 03:44:26 2011
@@ -53,10 +53,9 @@ public class BufferedRandomAccessFile ex
     // buffer which will cache file blocks
     private ByteBuffer buffer;
 
-    // `current` is the current user-visible position in in the file, i.e., corresponding to getFilePointer() or seek()
-    // `bufferOffset` is the position in the file of the beginning of the buffer
+    // `current` as current position in file
+    // `bufferOffset` is the offset of the beginning of the buffer
     // `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
-    // (NOT the same as bufferOffset + buffer.length since buffer may not be completely full)
     private long bufferOffset, bufferEnd, current = 0;
 
     // max buffer size is set according to (int size) parameter in the
@@ -202,6 +201,7 @@ public class BufferedRandomAccessFile ex
             buffer.rewind();
             bufferEnd = bufferOffset;
             hitEOF = true;
+
             return 0;
         }
 
@@ -466,6 +466,11 @@ public class BufferedRandomAccessFile ex
         return (int) bytes;
     }
 
+    public static BufferedRandomAccessFile getUncachingReader(String filename) throws IOException
+    {
+        return new BufferedRandomAccessFile(new File(filename), "r", 8 * 1024 * 1024, true);
+    }
+
     /**
      * Class to hold a mark to the position of the file
      */

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Wed Feb 23 03:44:26 2011
@@ -1234,13 +1234,19 @@ public class StorageService implements I
     {
         if (tableName.equals("system"))
             throw new RuntimeException("Cleanup of the system table is neither necessary nor wise");
-                    
+
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
         {
             cfStore.forceCleanup();
         }
     }
 
+    public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
+            cfStore.scrub();
+    }
+
     public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Feb 23 03:44:26 2011
@@ -158,6 +158,14 @@ public interface StorageServiceMBean
     public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 
     /**
+     * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace.
+     * If columnFamilies array is empty, all CFs are scrubbed.
+     *
+     * Scrubbed CFs will be snapshotted first.
+     */
+    public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+
+    /**
      * Flush all memtables for the given column families, or all columnfamilies for the given table
      * if none are explicitly listed.
      * @param tableName

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Feb 23 03:44:26 2011
@@ -72,7 +72,7 @@ public class NodeCmd {
 
     public enum NodeCommand {
         RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH, DRAIN,
-        DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT,
+        DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
         COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
         DISABLETHRIFT, ENABLETHRIFT
@@ -114,6 +114,7 @@ public class NodeCmd {
         addCmdHelp(header, "repair [keyspace] [cfnames]", "Repair one or more column family");
         addCmdHelp(header, "cleanup [keyspace] [cfnames]", "Run cleanup on one or more column family");
         addCmdHelp(header, "compact [keyspace] [cfnames]", "Force a (major) compaction on one or more column family");
+        addCmdHelp(header, "scrub [keyspace] [cfnames]", "Scrub (rebuild sstables for) one or more column family");
         addCmdHelp(header, "invalidatekeycache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family");
         addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family");
         addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", "Print min and max compaction thresholds for a given column family");
@@ -574,6 +575,7 @@ public class NodeCmd {
             case COMPACT :
             case REPAIR  :
             case FLUSH   :
+            case SCRUB   :
             case INVALIDATEKEYCACHE :
             case INVALIDATEROWCACHE :
                 optionalKSandCFs(nc, arguments, probe);
@@ -628,51 +630,22 @@ public class NodeCmd {
 
     private static void optionalKSandCFs(NodeCommand nc, String[] cmdArgs, NodeProbe probe) throws InterruptedException, IOException
     {
-        // Per-keyspace
-        if (cmdArgs.length == 1)
+        // cmdArgs[0] is "scrub"
+        // if there is one additional arg, it's the keyspace; more are columnfamilies
+        List<String> keyspaces = cmdArgs.length == 1 ? probe.getKeyspaces() : Arrays.asList(cmdArgs[1]);
+        for (String keyspace : keyspaces)
         {
-            for (String keyspace : probe.getKeyspaces())
-            {
-                switch (nc)
-                {
-                    case REPAIR             : probe.forceTableRepair(keyspace); break;
-                    case INVALIDATEKEYCACHE : probe.invalidateKeyCaches(keyspace); break;
-                    case INVALIDATEROWCACHE : probe.invalidateRowCaches(keyspace); break;
-                    case FLUSH   :
-                        try { probe.forceTableFlush(keyspace); }
-                        catch (ExecutionException ee) { err(ee, "Error occured while flushing keyspace " + keyspace); }
-                        break;
-                    case COMPACT :
-                        try { probe.forceTableCompaction(keyspace); }
-                        catch (ExecutionException ee) { err(ee, "Error occured while compacting keyspace " + keyspace); }
-                        break;
-                    case CLEANUP :
-                        if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
-                        try { probe.forceTableCleanup(keyspace); }
-                        catch (ExecutionException ee) { err(ee, "Error occured while cleaning up keyspace " + keyspace); }
-                        break;
-                    default:
-                        throw new RuntimeException("Unreachable code.");
-                }
-            }
-        }
-        // Per-cf (or listed cfs) in given keyspace
-        else
-        {
-            String keyspace = cmdArgs[1];
-            
-            // Check if this keyspace exists
             if (!probe.getKeyspaces().contains(keyspace))
             {
                 System.err.println("Keyspace [" + keyspace + "] does not exist.");
                 System.exit(1);
             }
-                
-            String[] columnFamilies = new String[cmdArgs.length - 2];
-            for (int i = 0; i < columnFamilies.length; i++)
-            {
-                columnFamilies[i] = cmdArgs[i + 2];
-            }
+        }
+
+        // second loop so we're less likely to die halfway through due to invalid keyspace
+        for (String keyspace : keyspaces)
+        {
+            String[] columnFamilies = cmdArgs.length <= 2 ? new String[0] : Arrays.copyOfRange(cmdArgs, 2, cmdArgs.length);
             switch (nc)
             {
                 case REPAIR  : probe.forceTableRepair(keyspace, columnFamilies); break;
@@ -687,9 +660,14 @@ public class NodeCmd {
                     catch (ExecutionException ee) { err(ee, "Error occured during compaction"); }
                     break;
                 case CLEANUP :
+                    if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
                     try { probe.forceTableCleanup(keyspace, columnFamilies); }
                     catch (ExecutionException ee) { err(ee, "Error occured during cleanup"); }
                     break;
+                case SCRUB :
+                    try { probe.scrub(keyspace, columnFamilies); }
+                    catch (ExecutionException ee) { err(ee, "Error occured while scrubbing keyspace " + keyspace); }
+                    break;
                 default:
                     throw new RuntimeException("Unreachable code.");
             }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Feb 23 03:44:26 2011
@@ -167,6 +167,11 @@ public class NodeProbe
         ssProxy.forceTableCleanup(tableName, columnFamilies);
     }
 
+    public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        ssProxy.scrub(tableName, columnFamilies);
+    }
+
     public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceTableCompaction(tableName, columnFamilies);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java Wed Feb 23 03:44:26 2011
@@ -157,7 +157,7 @@ public final class CLibrary
         }
     }
 
-    private static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException
+    public static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException
     {
         String osname = System.getProperty("os.name");
         ProcessBuilder pb;

Modified: cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml Wed Feb 23 03:44:26 2011
@@ -82,6 +82,11 @@ keyspaces:
           rows_cached: 0
           keys_cached: 0
 
+        - name: Super5
+          column_type: Super
+          rows_cached: 0
+          keys_cached: 0
+            
         - name: Indexed1
           column_metadata:
             - name: birthdate

Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.

Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.

Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.

Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.

Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java?rev=1073586&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java Wed Feb 23 03:44:26 2011
@@ -0,0 +1,152 @@
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+
+import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ScrubTest extends CleanupHelper
+{
+    public String TABLE = "Keyspace1";
+    public String CF = "Standard1";
+    public String CF2 = "Super5";
+    public String  corruptSSTableName;
+
+    
+    public void copySSTables() throws IOException 
+    {
+        String root = System.getProperty("corrupt-sstable-root");
+        assert root != null;
+        File rootDir = new File(root);
+        assert rootDir.isDirectory();
+        
+        String[] destDirs = DatabaseDescriptor.getAllDataFileLocationsForTable(TABLE);
+        assert destDirs != null;
+        assert destDirs.length > 0;
+       
+        FileUtils.createDirectory(destDirs[0]);
+        for (File srcFile : rootDir.listFiles())
+        {
+            if (srcFile.getName().equals(".svn"))
+                continue;
+            File destFile = new File(destDirs[0]+File.separator+srcFile.getName());
+            CLibrary.createHardLinkWithExec(srcFile, destFile);
+                        
+            destFile = new File(destDirs[0]+File.separator+srcFile.getName());
+                        
+            assert destFile.exists() : destFile.getAbsoluteFile();
+            
+            if(destFile.getName().endsWith("Data.db"))
+                corruptSSTableName = destFile.getCanonicalPath();
+        }   
+
+        assert corruptSSTableName != null;
+    }
+   
+    @Test
+    public void testScrubFile() throws Exception
+    {        
+        copySSTables();
+
+        Table table = Table.open(TABLE);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
+        assert cfs.getSSTables().size() > 0;
+      
+        List<Row> rows;
+        boolean caught = false;
+        try
+        {
+             rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter());
+             fail("This slice should fail");
+        }
+        catch (NegativeArraySizeException e)
+        {
+            caught = true;
+        }
+        assert caught : "'corrupt' test file actually was not";
+        
+        CompactionManager.instance.performScrub(cfs);
+        rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(100, rows.size());
+    }
+    
+    
+    @Test
+    public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Table table = Table.open(TABLE);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
+
+        List<Row> rows;
+
+        // insert data and verify we get it back w/ range query
+        fillCF(cfs, 1);
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(1, rows.size());
+
+        CompactionManager.instance.performScrub(cfs);
+
+        // check data is still there
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(1, rows.size());
+    }
+
+    @Test
+    public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Table table = Table.open(TABLE);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
+
+        List<Row> rows;
+
+        // insert data and verify we get it back w/ range query
+        fillCF(cfs, 10);
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(10, rows.size());
+
+        CompactionManager.instance.performScrub(cfs);
+
+        // check data is still there
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(10, rows.size());
+    }
+      
+    protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
+    {
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+            // create a row and update the birthdate value, test that the index query fetches the new version
+            RowMutation rm;
+            rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key));
+            ColumnFamily cf = ColumnFamily.create(TABLE, CF);
+            cf.addColumn(column("c1", "1", 1L));
+            cf.addColumn(column("c2", "2", 1L));
+            rm.add(cf);
+            rm.applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+    }
+
+    
+    
+    
+}

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Feb 23 03:44:26 2011
@@ -234,7 +234,7 @@ public class LazilyCompactedRowTest exte
         @Override
         protected AbstractCompactedRow getCompactedRow()
         {
-            return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
+            return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true);
         }
     }
 }