You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/06/29 11:19:14 UTC

[3/4] git commit: Improve scrub and allow to run it offline

Improve scrub and allow to run it offline

patch by slebresne; reviewed by jbellis for CASSANDRA-4321


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75453d01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75453d01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75453d01

Branch: refs/heads/trunk
Commit: 75453d0179b841ef2e63475f81484661c2e502cb
Parents: 88b48df
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jun 29 10:37:10 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jun 29 10:37:10 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 bin/sstablescrub                                   |   50 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   26 +-
 src/java/org/apache/cassandra/db/DefsTable.java    |    2 +-
 src/java/org/apache/cassandra/db/Table.java        |   20 +-
 .../db/compaction/CompactionController.java        |   20 +-
 .../cassandra/db/compaction/CompactionManager.java |  226 +---------
 .../cassandra/db/compaction/LeveledManifest.java   |   21 +-
 .../apache/cassandra/db/compaction/Scrubber.java   |  355 +++++++++++++++
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |    6 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |   12 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   27 +-
 .../apache/cassandra/service/StorageService.java   |    9 +-
 .../org/apache/cassandra/tools/BulkLoader.java     |   24 +-
 .../apache/cassandra/tools/StandaloneScrubber.java |  282 ++++++++++++
 .../org/apache/cassandra/utils/OutputHandler.java  |   95 ++++
 test/unit/org/apache/cassandra/db/ScrubTest.java   |   70 +++-
 17 files changed, 963 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e9f34ed..65f9fbb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@
  * (cql3) allow updating column_alias types (CASSANDRA-4041)
  * (cql3) Fix deletion bug (CASSANDRA-4193)
  * Fix computation of overlapping sstable for leveled compaction (CASSANDRA-4321)
+ * Improve scrub and allow to run it offline (CASSANDRA-4321)
 Merged from 1.0:
  * Set gc_grace on index CF to 0 (CASSANDRA-4314)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/bin/sstablescrub
----------------------------------------------------------------------
diff --git a/bin/sstablescrub b/bin/sstablescrub
new file mode 100644
index 0000000..437bee0
--- /dev/null
+++ b/bin/sstablescrub
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+        -Dlog4j.configuration=log4j-tools.properties \
+        org.apache.cassandra.tools.StandaloneScrubber "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5c7e3b2..a40e52e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -205,7 +205,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                               IPartitioner partitioner,
                               int generation,
                               CFMetaData metadata,
-                              Directories directories)
+                              Directories directories,
+                              boolean loadSSTables)
     {
         assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
 
@@ -230,8 +231,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                        ? Collections.<DecoratedKey>emptySet()
                                        : CacheService.instance.keyCache.readSaved(table.name, columnFamily, partitioner);
 
-        Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true);
-        data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner));
+        if (loadSSTables)
+        {
+            Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true);
+            data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner));
+        }
 
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
@@ -304,15 +308,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getMeanColumns();
     }
 
-    public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily)
+    public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, boolean loadSSTables)
+    {
+        return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(table.name, columnFamily), loadSSTables);
+    }
+
+    public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
     {
-        return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(table.name, columnFamily));
+        return createColumnFamilyStore(table, columnFamily, partitioner, metadata, true);
     }
 
-    public static synchronized ColumnFamilyStore createColumnFamilyStore(Table table,
+    private static synchronized ColumnFamilyStore createColumnFamilyStore(Table table,
                                                                          String columnFamily,
                                                                          IPartitioner partitioner,
-                                                                         CFMetaData metadata)
+                                                                         CFMetaData metadata,
+                                                                         boolean loadSSTables)
     {
         // get the max generation number, to prevent generation conflicts
         Directories directories = Directories.create(table.name, columnFamily);
@@ -328,7 +338,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Collections.sort(generations);
         int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
 
-        return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata, directories);
+        return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata, directories, loadSSTables);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index be23934..93f9867 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -433,7 +433,7 @@ public class DefsTable
         Schema.instance.setTableDefinition(ksm);
 
         if (!StorageService.instance.isClientMode())
-            Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
+            Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
     }
 
     private static void updateKeyspace(KSMetaData newState) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 63e7a96..f3a414e 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -94,10 +94,15 @@ public class Table
 
     public static Table open(String table)
     {
-        return open(table, Schema.instance);
+        return open(table, Schema.instance, true);
     }
 
-    public static Table open(String table, Schema schema)
+    public static Table openWithoutSSTables(String table)
+    {
+        return open(table, Schema.instance, false);
+    }
+
+    private static Table open(String table, Schema schema, boolean loadSSTables)
     {
         Table tableInstance = schema.getTableInstance(table);
 
@@ -111,7 +116,7 @@ public class Table
                 if (tableInstance == null)
                 {
                     // open and store the table
-                    tableInstance = new Table(table);
+                    tableInstance = new Table(table, loadSSTables);
                     schema.storeTableInstance(tableInstance);
 
                     // table has to be constructed and in the cache before cacheRow can be called
@@ -275,7 +280,7 @@ public class Table
         return list;
     }
 
-    private Table(String table)
+    private Table(String table, boolean loadSSTables)
     {
         name = table;
         KSMetaData ksm = Schema.instance.getKSMetaData(table);
@@ -296,9 +301,8 @@ public class Table
         for (CFMetaData cfm : new ArrayList<CFMetaData>(Schema.instance.getTableDefinition(table).cfMetaData().values()))
         {
             logger.debug("Initializing {}.{}", name, cfm.cfName);
-            initCf(cfm.cfId, cfm.cfName);
+            initCf(cfm.cfId, cfm.cfName, loadSSTables);
         }
-
     }
 
     public void createReplicationStrategy(KSMetaData ksm) throws ConfigurationException
@@ -343,7 +347,7 @@ public class Table
     }
 
     /** adds a cf to internal structures, ends up creating disk files). */
-    public void initCf(Integer cfId, String cfName)
+    public void initCf(Integer cfId, String cfName, boolean loadSSTables)
     {
         if (columnFamilyStores.containsKey(cfId))
         {
@@ -364,7 +368,7 @@ public class Table
         }
         else
         {
-            columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName));
+            columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 8ff871e..1a90ab0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -66,6 +66,19 @@ public class CompactionController
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
     {
+        this(cfs,
+             gcBefore,
+             forceDeserialize || !allLatestVersion(sstables),
+             DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)),
+             cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables)));
+    }
+
+    protected CompactionController(ColumnFamilyStore cfs,
+                                   int gcBefore,
+                                   boolean deserializeRequired,
+                                   IntervalTree<SSTableReader> overlappingTree,
+                                   boolean keyExistenceIsExpensive)
+    {
         assert cfs != null;
         this.cfs = cfs;
         this.gcBefore = gcBefore;
@@ -74,10 +87,9 @@ public class CompactionController
         // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
         // current 'stop all write during memtable switch' situation).
         this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
-        deserializeRequired = forceDeserialize || !allLatestVersion(sstables);
-        Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables);
-        overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
-        keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables));
+        this.deserializeRequired = deserializeRequired;
+        this.overlappingTree = overlappingTree;
+        this.keyExistenceIsExpensive = keyExistenceIsExpensive;
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fb5b253..3c50a09 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -473,203 +473,27 @@ public class CompactionManager implements CompactionManagerMBean
 
     private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
-        logger.info("Scrubbing " + sstable);
-        CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), true);
-        boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+        Scrubber scrubber = new Scrubber(cfs, sstable);
 
-        // Calculate the expected compacted filesize
-        File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
-        if (compactionFileLocation == null)
-            throw new IOException("disk full");
-        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
-                                               (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
-
-        // loop through each row, deserializing to check for damage.
-        // we'll also loop through the index at the same time, using the position from the index to recover if the
-        // row header (key or data size) is corrupt. (This means our position in the index file will be one row
-        // "ahead" of the data file.)
-        final RandomAccessReader dataFile = sstable.openDataReader(true);
-        long rowsRead = 0;
-        RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
-        ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+        CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
         executor.beginCompaction(scrubInfo);
-
-        SSTableWriter writer = null;
-        SSTableReader newSstable = null;
-        int goodRows = 0, badRows = 0, emptyRows = 0;
-
         try
         {
-            ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
-            {
-                // throw away variable so we don't have a side effect in the assert
-                long firstRowPositionFromIndex = indexFile.readLong();
-                assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
-            }
-
-            // TODO errors when creating the writer may leave empty temp files.
-            writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
-
-            while (!dataFile.isEOF())
-            {
-                if (scrubInfo.isStopRequested())
-                    throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
-                long rowStart = dataFile.getFilePointer();
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading row at " + rowStart);
-
-                DecoratedKey key = null;
-                long dataSize = -1;
-                try
-                {
-                    key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                    dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
-                    if (logger.isDebugEnabled())
-                        logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    // check for null key below
-                }
-
-                ByteBuffer currentIndexKey = nextIndexKey;
-                long nextRowPositionFromIndex;
-                try
-                {
-                    nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                    nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
-                }
-                catch (Throwable th)
-                {
-                    logger.warn("Error reading index file", th);
-                    nextIndexKey = null;
-                    nextRowPositionFromIndex = dataFile.length();
-                }
-
-                long dataStart = dataFile.getFilePointer();
-                long dataStartFromIndex = currentIndexKey == null
-                                        ? -1
-                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
-                long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
-                assert currentIndexKey != null || indexFile.isEOF();
-                if (logger.isDebugEnabled() && currentIndexKey != null)
-                    logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
-
-                writer.mark();
-                try
-                {
-                    if (key == null)
-                        throw new IOError(new IOException("Unable to read row key from data file"));
-                    if (dataSize > dataFile.length())
-                        throw new IOError(new IOException("Impossible row size " + dataSize));
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (compactedRow.isEmpty())
-                    {
-                        emptyRows++;
-                    }
-                    else
-                    {
-                        writer.append(compactedRow);
-                        goodRows++;
-                    }
-                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
-                        logger.warn("Index file contained a different key or row size; using key from data file");
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    logger.warn("Non-fatal error reading row (stacktrace follows)", th);
-                    writer.resetAndTruncate();
-
-                    if (currentIndexKey != null
-                        && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
-                    {
-                        logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
-                                                  dataSizeFromIndex, dataStartFromIndex));
-                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
-                        try
-                        {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (compactedRow.isEmpty())
-                            {
-                                emptyRows++;
-                            }
-                            else
-                            {
-                                writer.append(compactedRow);
-                                goodRows++;
-                            }
-                        }
-                        catch (Throwable th2)
-                        {
-                            throwIfFatal(th2);
-                            // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                            if (isCommutative)
-                                throw new IOError(th2);
-
-                            logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
-                            writer.resetAndTruncate();
-                            dataFile.seek(nextRowPositionFromIndex);
-                            badRows++;
-                        }
-                    }
-                    else
-                    {
-                        // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                        if (isCommutative)
-                            throw new IOError(th);
-
-                        logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
-                        if (currentIndexKey != null)
-                            dataFile.seek(nextRowPositionFromIndex);
-                        badRows++;
-                    }
-                }
-                if ((rowsRead++ % 1000) == 0)
-                    controller.mayThrottle(dataFile.getFilePointer());
-            }
-
-            if (writer.getFilePointer() > 0)
-                newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
-        }
-        catch (Exception e)
-        {
-            if (writer != null)
-                writer.abort();
-            throw FBUtilities.unchecked(e);
+            scrubber.scrub();
         }
         finally
         {
-            FileUtils.closeQuietly(dataFile);
-            FileUtils.closeQuietly(indexFile);
-
+            scrubber.close();
             executor.finishCompaction(scrubInfo);
         }
 
-        if (newSstable == null)
-        {
-            cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB);
-            if (badRows > 0)
-                logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
-            else
-                logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
-        }
-        else
-        {
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB);
-            logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
-            if (badRows > 0)
-                logger.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
-        }
-    }
+        if (scrubber.getNewInOrderSSTable() != null)
+            cfs.addSSTable(scrubber.getNewInOrderSSTable());
 
-    private void throwIfFatal(Throwable th)
-    {
-        if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError))
-            throw (Error) th;
+        if (scrubber.getNewSSTable() == null)
+            cfs.markCompacted(Collections.singletonList(sstable), OperationType.SCRUB);
+        else
+            cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
     }
 
     /**
@@ -819,8 +643,8 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
-            throws IOException
+    public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
+    throws IOException
     {
         if (writer == null)
         {
@@ -1218,32 +1042,6 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private static class ScrubInfo extends CompactionInfo.Holder
-    {
-        private final RandomAccessReader dataFile;
-        private final SSTableReader sstable;
-        public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
-        {
-            this.dataFile = dataFile;
-            this.sstable = sstable;
-        }
-
-        public CompactionInfo getCompactionInfo()
-        {
-            try
-            {
-                return new CompactionInfo(sstable.metadata,
-                                          OperationType.SCRUB,
-                                          dataFile.getFilePointer(),
-                                          dataFile.length());
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException();
-            }
-        }
-    }
-
     public void stopCompaction(String type)
     {
         OperationType operation = OperationType.valueOf(type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index beb7d74..9504072 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -85,11 +85,16 @@ public class LeveledManifest
 
     static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize)
     {
+        return create(cfs, maxSSTableSize, cfs.getSSTables());
+    }
+
+    public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable<SSTableReader> sstables)
+    {
         LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize);
-        load(cfs, manifest);
+        load(cfs, manifest, sstables);
 
         // ensure all SSTables are in the manifest
-        for (SSTableReader ssTableReader : cfs.getSSTables())
+        for (SSTableReader ssTableReader : sstables)
         {
             if (manifest.levelOf(ssTableReader) < 0)
                 manifest.add(ssTableReader);
@@ -98,7 +103,7 @@ public class LeveledManifest
         return manifest;
     }
 
-    private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
+    private static void load(ColumnFamilyStore cfs, LeveledManifest manifest, Iterable<SSTableReader> sstables)
     {
         File manifestFile = tryGetManifest(cfs);
         if (manifestFile == null)
@@ -116,7 +121,7 @@ public class LeveledManifest
                 JsonNode generationValues = generation.get("members");
                 for (JsonNode generationValue : generationValues)
                 {
-                    for (SSTableReader ssTableReader : cfs.getSSTables())
+                    for (SSTableReader ssTableReader : sstables)
                     {
                         if (ssTableReader.descriptor.generation == generationValue.getIntValue())
                         {
@@ -217,6 +222,14 @@ public class LeveledManifest
         serialize();
     }
 
+    public synchronized void sendBackToL0(SSTableReader sstable)
+    {
+        remove(sstable);
+        add(sstable, 0);
+
+        serialize();
+    }
+
     private String toString(Iterable<SSTableReader> sstables)
     {
         StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
new file mode 100644
index 0000000..314a873
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.io.*;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IntervalTree.*;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class Scrubber implements Closeable
+{
+    public final ColumnFamilyStore cfs;
+    public final SSTableReader sstable;
+    public final File destination;
+
+    private final CompactionController controller;
+    private final boolean isCommutative;
+    private final int expectedBloomFilterSize;
+
+    private final RandomAccessReader dataFile;
+    private final RandomAccessReader indexFile;
+    private final ScrubInfo scrubInfo;
+
+    private long rowsRead;
+
+    private SSTableWriter writer;
+    private SSTableReader newSstable;
+    private SSTableReader newInOrderSstable;
+
+    private int goodRows;
+    private int badRows;
+    private int emptyRows;
+
+    private final OutputHandler outputHandler;
+
+    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    {
+         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         {
+             return r1.key.compareTo(r2.key);
+         }
+    };
+    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+    {
+        this(cfs, sstable, new OutputHandler.LogOutput(), false);
+    }
+
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline) throws IOException
+    {
+        this.cfs = cfs;
+        this.sstable = sstable;
+        this.outputHandler = outputHandler;
+
+        // Calculate the expected compacted filesize
+        this.destination = cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
+        if (destination == null)
+            throw new IOException("disk full");
+
+        List<SSTableReader> toScrub = Collections.singletonList(sstable);
+        // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
+        this.controller = isOffline
+                        ? new ScrubController(cfs)
+                        : new CompactionController(cfs, Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs), true);
+        this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+        this.expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
+
+        // loop through each row, deserializing to check for damage.
+        // we'll also loop through the index at the same time, using the position from the index to recover if the
+        // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+        // "ahead" of the data file.)
+        this.dataFile = sstable.openDataReader(true);
+        this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+        this.scrubInfo = new ScrubInfo(dataFile, sstable);
+    }
+
+    public void scrub() throws IOException
+    {
+        outputHandler.output("Scrubbing " + sstable);
+        try
+        {
+            ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+            {
+                // throw away variable so we don't have a side effect in the assert
+                long firstRowPositionFromIndex = indexFile.readLong();
+                assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
+            }
+
+            // TODO errors when creating the writer may leave empty temp files.
+            writer = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+
+            AbstractCompactedRow prevRow = null;
+
+            while (!dataFile.isEOF())
+            {
+                if (scrubInfo.isStopRequested())
+                    throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
+                long rowStart = dataFile.getFilePointer();
+                outputHandler.debug("Reading row at " + rowStart);
+
+                DecoratedKey key = null;
+                long dataSize = -1;
+                try
+                {
+                    key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                    dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                    outputHandler.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    // check for null key below
+                }
+
+                ByteBuffer currentIndexKey = nextIndexKey;
+                long nextRowPositionFromIndex;
+                try
+                {
+                    nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+                    nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+                }
+                catch (Throwable th)
+                {
+                    outputHandler.warn("Error reading index file", th);
+                    nextIndexKey = null;
+                    nextRowPositionFromIndex = dataFile.length();
+                }
+
+                long dataStart = dataFile.getFilePointer();
+                long dataStartFromIndex = currentIndexKey == null
+                                        ? -1
+                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                assert currentIndexKey != null || indexFile.isEOF();
+                if (currentIndexKey != null)
+                    outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+
+                writer.mark();
+                try
+                {
+                    if (key == null)
+                        throw new IOError(new IOException("Unable to read row key from data file"));
+                    if (dataSize > dataFile.length())
+                        throw new IOError(new IOException("Impossible row size " + dataSize));
+                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                    if (compactedRow.isEmpty())
+                    {
+                        emptyRows++;
+                    }
+                    else
+                    {
+                        if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0)
+                        {
+                            outOfOrderRows.add(compactedRow);
+                            outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                            continue;
+                        }
+
+                        writer.append(compactedRow);
+                        prevRow = compactedRow;
+                        goodRows++;
+                    }
+                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                        outputHandler.warn("Index file contained a different key or row size; using key from data file");
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    outputHandler.warn("Non-fatal error reading row (stacktrace follows)", th);
+                    writer.resetAndTruncate();
+
+                    if (currentIndexKey != null
+                        && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                    {
+                        outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s",
+                                                  dataSizeFromIndex, dataStartFromIndex));
+                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+                        try
+                        {
+                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                            if (compactedRow.isEmpty())
+                            {
+                                emptyRows++;
+                            }
+                            else
+                            {
+                                if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0)
+                                {
+                                    outOfOrderRows.add(compactedRow);
+                                    outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                    continue;
+                                }
+                                writer.append(compactedRow);
+                                prevRow = compactedRow;
+                                goodRows++;
+                            }
+                        }
+                        catch (Throwable th2)
+                        {
+                            throwIfFatal(th2);
+                            // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+                            if (isCommutative)
+                                throw new IOError(th2);
+
+                            outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
+                            writer.resetAndTruncate();
+                            dataFile.seek(nextRowPositionFromIndex);
+                            badRows++;
+                        }
+                    }
+                    else
+                    {
+                        // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+                        if (isCommutative)
+                            throw new IOError(th);
+
+                        outputHandler.warn("Row at " + dataStart + " is unreadable; skipping to next");
+                        if (currentIndexKey != null)
+                            dataFile.seek(nextRowPositionFromIndex);
+                        badRows++;
+                    }
+                }
+                if ((rowsRead++ % 1000) == 0)
+                    controller.mayThrottle(dataFile.getFilePointer());
+            }
+
+            if (writer.getFilePointer() > 0)
+                newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+        }
+        catch (Exception e)
+        {
+            if (writer != null)
+                writer.abort();
+            throw FBUtilities.unchecked(e);
+        }
+
+        if (!outOfOrderRows.isEmpty())
+        {
+            SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+            for (AbstractCompactedRow row : outOfOrderRows)
+                inOrderWriter.append(row);
+            newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+            outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
+        }
+
+        if (newSstable == null)
+        {
+            if (badRows > 0)
+                outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+            else
+                outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+        }
+        else
+        {
+            outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+            if (badRows > 0)
+                outputHandler.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
+        }
+    }
+
+    public SSTableReader getNewSSTable()
+    {
+        return newSstable;
+    }
+
+    public SSTableReader getNewInOrderSSTable()
+    {
+        return newInOrderSstable;
+    }
+
+    private void throwIfFatal(Throwable th)
+    {
+        if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError))
+            throw (Error) th;
+    }
+
+    public void close()
+    {
+        FileUtils.closeQuietly(dataFile);
+        FileUtils.closeQuietly(indexFile);
+    }
+
+    public CompactionInfo.Holder getScrubInfo()
+    {
+        return scrubInfo;
+    }
+
+    private static class ScrubInfo extends CompactionInfo.Holder
+    {
+        private final RandomAccessReader dataFile;
+        private final SSTableReader sstable;
+
+        public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+        {
+            this.dataFile = dataFile;
+            this.sstable = sstable;
+        }
+
+        public CompactionInfo getCompactionInfo()
+        {
+            try
+            {
+                return new CompactionInfo(sstable.metadata,
+                                          OperationType.SCRUB,
+                                          dataFile.getFilePointer(),
+                                          dataFile.length());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException();
+            }
+        }
+    }
+
+    private static class ScrubController extends CompactionController
+    {
+        public ScrubController(ColumnFamilyStore cfs)
+        {
+            super(cfs, Integer.MAX_VALUE, true, new IntervalTree<SSTableReader>(Collections.<Interval>emptyList()), false);
+        }
+
+        @Override
+        public boolean shouldPurge(DecoratedKey key)
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index cfd5fe4..25033ec 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -345,10 +346,11 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         }
     }
 
-    static class NullOutputHandler implements SSTableLoader.OutputHandler
+    static class NullOutputHandler implements OutputHandler
     {
         public void output(String msg) {}
-
         public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 850d23b..b40d80b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -35,8 +35,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 
 /**
  * Cassandra SSTable bulk loader.
@@ -252,15 +251,6 @@ public class SSTableLoader
         }
     }
 
-    public interface OutputHandler
-    {
-        // called when an important info need to be displayed
-        public void output(String msg);
-
-        // called when a less important info need to be displayed
-        public void debug(String msg);
-    }
-
     public static abstract class Client
     {
         private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7d4d304..0aaa932 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -138,6 +138,11 @@ public class SSTableReader extends SSTable
         return open(desc, componentsFor(desc), metadata, p);
     }
 
+    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+    {
+        return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, StorageService.getPartitioner(), false);
+    }
+
     public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
         return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
@@ -145,6 +150,17 @@ public class SSTableReader extends SSTable
 
     public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
+        return open(descriptor, components, savedKeys, tracker, metadata, partitioner, true);
+    }
+
+    private static SSTableReader open(Descriptor descriptor,
+                                      Set<Component> components,
+                                      Set<DecoratedKey> savedKeys,
+                                      DataTracker tracker,
+                                      CFMetaData metadata,
+                                      IPartitioner partitioner,
+                                      boolean validate) throws IOException
+    {
         assert partitioner != null;
         // Minimum components without which we can't do anything
         assert components.contains(Component.DATA);
@@ -187,6 +203,10 @@ public class SSTableReader extends SSTable
             sstable.load(false, savedKeys);
             sstable.loadBloomFilter();
         }
+
+        if (validate)
+            sstable.validate();
+
         if (logger.isDebugEnabled())
             logger.debug("INDEX LOAD TIME for " + descriptor + ": " + (System.currentTimeMillis() - start) + " ms.");
 
@@ -409,13 +429,18 @@ public class SSTableReader extends SSTable
         }
         this.first = getMinimalKey(left);
         this.last = getMinimalKey(right);
-        assert this.first.compareTo(this.last) <= 0: String.format("SSTable first key %s > last key %s", this.first, this.last);
 
         // finalize the state of the reader
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
     }
 
+    private void validate()
+    {
+        if (this.first.compareTo(this.last) > 0)
+            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+    }
+
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
     private long getIndexScanPosition(RowPosition key)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dbaf751..9d9690a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -68,6 +68,7 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NodeId;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
@@ -3038,13 +3039,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             }
         };
 
-        SSTableLoader.OutputHandler oh = new SSTableLoader.OutputHandler()
-        {
-            public void output(String msg) { logger_.info(msg); }
-            public void debug(String msg) { logger_.debug(msg); }
-        };
-
-        SSTableLoader loader = new SSTableLoader(dir, client, oh);
+        SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput());
         try
         {
             loader.stream().get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 4520188..ace37db 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -55,7 +56,8 @@ public class BulkLoader
         LoaderOptions options = LoaderOptions.parseArgs(args);
         try
         {
-            SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options);
+            OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+            SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort), handler);
             DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
             SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
 
@@ -174,11 +176,11 @@ public class BulkLoader
     static class ExternalClient extends SSTableLoader.Client
     {
         private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
-        private final SSTableLoader.OutputHandler outputHandler;
+        private final OutputHandler outputHandler;
         private Set<InetAddress> hosts = new HashSet<InetAddress>();
         private int rpcPort;
 
-        public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port)
+        public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port)
         {
             super();
             this.outputHandler = outputHandler;
@@ -245,7 +247,7 @@ public class BulkLoader
         }
     }
 
-    static class LoaderOptions implements SSTableLoader.OutputHandler
+    static class LoaderOptions
     {
         public final File directory;
 
@@ -367,18 +369,6 @@ public class BulkLoader
             printUsage(options);
             System.exit(1);
         }
-
-        public void output(String msg)
-        {
-            System.out.println(msg);
-        }
-
-        public void debug(String msg)
-        {
-            if (verbose)
-                System.out.println(msg);
-        }
-
         private static CmdLineOptions getCmdLineOptions()
         {
             CmdLineOptions options = new CmdLineOptions();
@@ -409,7 +399,7 @@ public class BulkLoader
         }
     }
 
-    private static class CmdLineOptions extends Options
+    public static class CmdLineOptions extends Options
     {
         /**
          * Add option with argument and argument name

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
new file mode 100644
index 0000000..0ac6a80
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.utils.OutputHandler;
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneScrubber
+{
+    static
+    {
+        AbstractCassandraDaemon.initLog4j();
+    }
+
+    private static final String TOOL_NAME = "sstablescrub";
+    private static final String VERBOSE_OPTION  = "verbose";
+    private static final String DEBUG_OPTION  = "debug";
+    private static final String HELP_OPTION  = "help";
+    private static final String MANIFEST_CHECK_OPTION  = "manifest-check";
+
+    public static void main(String args[]) throws IOException
+    {
+        Options options = Options.parseArgs(args);
+        try
+        {
+            // load keyspace descriptions.
+            DatabaseDescriptor.loadSchemas();
+
+            if (Schema.instance.getCFMetaData(options.tableName, options.cfName) == null)
+                throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s",
+                                                                 options.tableName,
+                                                                 options.cfName));
+
+            // Do not load sstables since they might be broken
+            Table table = Table.openWithoutSSTables(options.tableName);
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(options.cfName);
+            String snapshotName = "pre-scrub-" + System.currentTimeMillis();
+
+            OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+            Directories.SSTableLister lister = cfs.directories.sstableLister().skipCompacted(true).skipTemporary(true);
+
+            List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+
+            // Scrub sstables
+            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+            {
+                Set<Component> components = entry.getValue();
+                if (!components.contains(Component.DATA) || !components.contains(Component.PRIMARY_INDEX))
+                    continue;
+
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs.metadata);
+                    sstables.add(sstable);
+
+                    File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
+                    sstable.createLinks(snapshotDirectory.getPath());
+
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+            System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
+
+            // If leveled, load the manifest
+            LeveledManifest manifest = null;
+            if (cfs.directories.tryGetLeveledManifest() != null)
+            {
+                cfs.directories.snapshotLeveledManifest(snapshotName);
+                System.out.println(String.format("Leveled manifest snapshotted into snapshot %s", snapshotName));
+
+                int maxSizeInMB = (int)((((LeveledCompactionStrategy)cfs.getCompactionStrategy()).getMaxSSTableSize()) / (1024L * 1024L));
+                manifest = LeveledManifest.create(cfs, maxSizeInMB, sstables);
+            }
+
+            if (!options.manifestCheckOnly)
+            {
+                for (SSTableReader sstable : sstables)
+                {
+                    try
+                    {
+                        Scrubber scrubber = new Scrubber(cfs, sstable, handler, true);
+                        try
+                        {
+                            scrubber.scrub();
+                        }
+                        finally
+                        {
+                            scrubber.close();
+                        }
+
+                        if (manifest != null)
+                        {
+                            if (scrubber.getNewInOrderSSTable() != null)
+                                manifest.add(scrubber.getNewInOrderSSTable());
+
+                            List<SSTableReader> added = scrubber.getNewSSTable() == null
+                                ? Collections.<SSTableReader>emptyList()
+                                : Collections.<SSTableReader>singletonList(scrubber.getNewSSTable());
+                            manifest.replace(Collections.singletonList(sstable), added);
+                        }
+
+                        // Remove the sstable (it's been copied by scrub and snapshotted)
+                        sstable.markCompacted();
+                        sstable.releaseReference();
+                    }
+                    catch (Exception e)
+                    {
+                        System.err.println(String.format("Error scrubbing %s: %s", sstable, e.getMessage()));
+                        if (options.debug)
+                            e.printStackTrace(System.err);
+                    }
+                }
+            }
+
+            // Check (and repair) manifest
+            if (manifest != null)
+                checkManifest(manifest);
+
+            SSTableDeletingTask.waitForDeletions();
+            System.exit(0); // We need that to stop non daemonized threads
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    private static void checkManifest(LeveledManifest manifest)
+    {
+        System.out.println(String.format("Checking leveled manifest"));
+        for (int i = 1; i <= manifest.getLevelCount(); ++i)
+        {
+            List<SSTableReader> sstables = new ArrayList(manifest.getLevel(i));
+            Collections.sort(sstables, SSTable.sstableComparator);
+            if (sstables.isEmpty())
+                continue;
+
+            Iterator<SSTableReader> iter = sstables.iterator();
+            SSTableReader previous = iter.next();
+            while (iter.hasNext())
+            {
+                SSTableReader current = iter.next();
+
+                if (previous.last.compareTo(current.first) > 0)
+                {
+                    System.err.println(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s]", i,
+                                                     previous, previous.first, previous.last,
+                                                     current, current.first, current.last));
+                    System.out.println(String.format("Sending %s back to L0 to fix intra-level overlapping", current));
+                    manifest.sendBackToL0(current);
+                }
+                else
+                {
+                    previous = current;
+                }
+            }
+        }
+    }
+
+    private static class Options
+    {
+        public final String tableName;
+        public final String cfName;
+
+        public boolean debug;
+        public boolean verbose;
+        public boolean manifestCheckOnly;
+
+        private Options(String tableName, String cfName)
+        {
+            this.tableName = tableName;
+            this.cfName = cfName;
+        }
+
+        public static Options parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length != 2)
+                {
+                    String msg = args.length < 2 ? "Missing arguments" : "Too many arguments";
+                    System.err.println(msg);
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                String tableName = args[0];
+                String cfName = args[1];
+
+                Options opts = new Options(tableName, cfName);
+
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                opts.manifestCheckOnly = cmd.hasOption(MANIFEST_CHECK_OPTION);
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,          "display stack traces");
+            options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+            options.addOption("h",  HELP_OPTION,           "display this help message");
+            options.addOption("m",  MANIFEST_CHECK_OPTION, "only check and repair the leveled manifest, without actually scrubbing the sstables");
+            return options;
+        }
+
+        public static void printUsage(CmdLineOptions options)
+        {
+            String usage = String.format("%s [options] <keyspace> <column_family>", TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Scrub the sstable for the provided column family." );
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/utils/OutputHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/OutputHandler.java b/src/java/org/apache/cassandra/utils/OutputHandler.java
new file mode 100644
index 0000000..b203663
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/OutputHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface OutputHandler
+{
+    // called when an important info need to be displayed
+    public void output(String msg);
+
+    // called when a less important info need to be displayed
+    public void debug(String msg);
+
+    // called when the user needs to be warn
+    public void warn(String msg);
+    public void warn(String msg, Throwable th);
+
+    public static class LogOutput implements OutputHandler
+    {
+        private static Logger logger = LoggerFactory.getLogger(LogOutput.class);
+
+        public void output(String msg)
+        {
+            logger.info(msg);
+        }
+
+        public void debug(String msg)
+        {
+            logger.debug(msg);
+        }
+
+        public void warn(String msg)
+        {
+            logger.warn(msg);
+        }
+
+        public void warn(String msg, Throwable th)
+        {
+            logger.warn(msg, th);
+        }
+    }
+
+    public static class SystemOutput implements OutputHandler
+    {
+        public final boolean debug;
+        public final boolean printStack;
+
+        public SystemOutput(boolean debug, boolean printStack)
+        {
+            this.debug = debug;
+            this.printStack = printStack;
+        }
+
+        public void output(String msg)
+        {
+            System.out.println(msg);
+        }
+
+        public void debug(String msg)
+        {
+            if (debug)
+                System.out.println(msg);
+        }
+
+        public void warn(String msg)
+        {
+            warn(msg, null);
+        }
+
+        public void warn(String msg, Throwable th)
+        {
+            System.out.println("WARNING: " + msg);
+            if (printStack && th != null)
+                th.printStackTrace(System.out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 7cc431d..abec718 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -30,10 +30,13 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CLibrary;
 
@@ -41,29 +44,33 @@ import static org.apache.cassandra.Util.column;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.util.*;
+
 public class ScrubTest extends SchemaLoader
 {
     public String TABLE = "Keyspace1";
     public String CF = "Standard1";
     public String CF2 = "Super5";
     public String CF3 = "Standard2";
-    public String  corruptSSTableName;
-
 
-    public void copySSTables() throws IOException
+    public String copySSTables(String cf) throws IOException
     {
         String root = System.getProperty("corrupt-sstable-root");
         assert root != null;
         File rootDir = new File(root);
         assert rootDir.isDirectory();
 
-        File destDir = Directories.create(TABLE, CF2).getDirectoryForNewSSTables(1);
+        File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1);
+
+        String corruptSSTableName = null;
 
         FileUtils.createDirectory(destDir);
         for (File srcFile : rootDir.listFiles())
         {
             if (srcFile.getName().equals(".svn"))
                 continue;
+            if (!srcFile.getName().contains(cf))
+                continue;
             File destFile = new File(destDir, srcFile.getName());
             CLibrary.createHardLink(srcFile, destFile);
 
@@ -74,12 +81,13 @@ public class ScrubTest extends SchemaLoader
         }
 
         assert corruptSSTableName != null;
+        return corruptSSTableName;
     }
 
     @Test
     public void testScrubFile() throws Exception
     {
-        copySSTables();
+        copySSTables(CF2);
 
         Table table = Table.open(TABLE);
         ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
@@ -104,7 +112,6 @@ public class ScrubTest extends SchemaLoader
         assertEquals(100, rows.size());
     }
 
-
     @Test
     public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
     {
@@ -166,6 +173,57 @@ public class ScrubTest extends SchemaLoader
         assertEquals(10, rows.size());
     }
 
+    @Test
+    public void testScubOutOfOrder() throws Exception
+    {
+         CompactionManager.instance.disableAutoCompaction();
+         Table table = Table.open(TABLE);
+         String columnFamily = "Standard3";
+         ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily);
+
+        /*
+         * Code used to generate an outOfOrder sstable. The test must be run without assertions for this to work.
+         * The test also assumes an ordered partitioner.
+         *
+         * ColumnFamily cf = ColumnFamily.create(TABLE, columnFamily);
+         * cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L));
+
+         * SSTableWriter writer = cfs.createCompactionWriter((long)DatabaseDescriptor.getIndexInterval(), new File("."), Collections.<SSTableReader>emptyList());
+         * writer.append(Util.dk("a"), cf);
+         * writer.append(Util.dk("b"), cf);
+         * writer.append(Util.dk("z"), cf);
+         * writer.append(Util.dk("c"), cf);
+         * writer.append(Util.dk("y"), cf);
+         * writer.append(Util.dk("d"), cf);
+         * writer.closeAndOpenReader();
+         */
+
+        copySSTables(columnFamily);
+        cfs.loadNewSSTables();
+        assert cfs.getSSTables().size() > 0;
+
+        List<Row> rows;
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null);
+        assert !isRowOrdered(rows) : "'corrupt' test file actually was not";
+
+        CompactionManager.instance.performScrub(cfs);
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null);
+        assert isRowOrdered(rows) : "Scrub failed: " + rows;
+        assert rows.size() == 6: "Got " + rows.size();
+    }
+
+    private static boolean isRowOrdered(List<Row> rows)
+    {
+        DecoratedKey prev = null;
+        for (Row row : rows)
+        {
+            if (prev != null && prev.compareTo(row.key) > 0)
+                return false;
+            prev = row.key;
+        }
+        return true;
+    }
+
     protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
     {
         for (int i = 0; i < rowsPerSSTable; i++)