You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/02 15:10:46 UTC

[1/4] cassandra git commit: Avoid writing range tombstones after END_OF_ROW marker.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.1 38eaf7f10 -> b97fa1101


Avoid writing range tombstones after END_OF_ROW marker.

Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-10791


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26ca68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26ca68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26ca68

Branch: refs/heads/cassandra-3.1
Commit: 0b26ca68747cdecb907d7c238e04b39836efe3d1
Parents: 5175326
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 1 11:38:09 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 14:59:11 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  16 ++--
 .../org/apache/cassandra/db/RangeTombstone.java |   5 ++
 .../cassandra/db/compaction/Scrubber.java       |  25 +++++-
 .../io/sstable/SSTableIdentityIterator.java     |  79 +++++++++++++++----
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +
 .../Keyspace1-Standard3-jb-1-Summary.db         | Bin 71 -> 63 bytes
 .../Keyspace1-StandardInteger1-ka-2-CRC.db      | Bin 0 -> 8 bytes
 .../Keyspace1-StandardInteger1-ka-2-Data.db     | Bin 0 -> 12357 bytes
 .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 |   1 +
 .../Keyspace1-StandardInteger1-ka-2-Filter.db   | Bin 0 -> 176 bytes
 .../Keyspace1-StandardInteger1-ka-2-Index.db    | Bin 0 -> 108 bytes
 ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
 .../Keyspace1-StandardInteger1-ka-2-Summary.db  | Bin 0 -> 80 bytes
 .../Keyspace1-StandardInteger1-ka-2-TOC.txt     |   8 ++
 .../apache/cassandra/db/RowIndexEntryTest.java  |   1 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |  57 ++++++++++++-
 .../streaming/StreamingTransferTest.java        |  46 ++++++++++-
 18 files changed, 215 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0f9588..e00abfe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
  * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 0ea5c87..f63dfe1 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -147,6 +147,7 @@ public class ColumnIndex
                 add(tombstone);
                 tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             }
+            finishAddingAtoms();
             ColumnIndex index = build();
 
             maybeWriteEmptyRowHeader();
@@ -167,6 +168,7 @@ public class ColumnIndex
                 OnDiskAtom c =  columns.next();
                 add(c);
             }
+            finishAddingAtoms();
 
             return build();
         }
@@ -218,15 +220,19 @@ public class ColumnIndex
             }
         }
 
-        public ColumnIndex build() throws IOException
+        public void finishAddingAtoms() throws IOException
         {
-            // all columns were GC'd after all
-            if (lastColumn == null)
-                return ColumnIndex.EMPTY;
-
             long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer);
             endPosition += size;
             blockSize += size;
+        }
+
+        public ColumnIndex build()
+        {
+            assert !tombstoneTracker.hasUnwrittenTombstones();  // finishAddingAtoms must be called before building.
+            // all columns were GC'd after all
+            if (lastColumn == null)
+                return ColumnIndex.EMPTY;
 
             // the last column may have fallen on an index boundary already.  if not, index it explicitly.
             if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 4d22d48..5e41792 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -325,6 +325,11 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
             return false;
         }
 
+        public boolean hasUnwrittenTombstones()
+        {
+            return !unwrittenTombstones.isEmpty();
+        }
+
         /**
          * The tracker needs to track expired range tombstone but keep tracks that they are
          * expired, so this is what this class is used for.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 400df08..e02f901 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -155,6 +155,22 @@ public class Scrubber implements Closeable
                 if (scrubInfo.isStopRequested())
                     throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
 
+                updateIndexKey();
+
+                if (prevKey != null && indexFile != null)
+                {
+                    long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex;
+                    if (dataFile.getFilePointer() < nextRowStart)
+                    {
+                        // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table.
+                        saveOutOfOrderRow(prevKey,
+                                          SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
+                                          String.format("Row fragment detected after END_OF_ROW at key %s", prevKey));
+                        if (dataFile.isEOF())
+                            break;
+                    }
+                }
+
                 long rowStart = dataFile.getFilePointer();
                 outputHandler.debug("Reading row at " + rowStart);
 
@@ -170,8 +186,6 @@ public class Scrubber implements Closeable
                     // check for null key below
                 }
 
-                updateIndexKey();
-
                 long dataStart = dataFile.getFilePointer();
 
                 long dataStartFromIndex = -1;
@@ -369,8 +383,13 @@ public class Scrubber implements Closeable
 
     private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
     {
+        saveOutOfOrderRow(key, atoms, String.format("Out of order row detected (%s found after %s)", key, prevKey));
+    }
+
+    void saveOutOfOrderRow(DecoratedKey key, SSTableIdentityIterator atoms, String message)
+    {
         // TODO bitch if the row is too large?  if it is there's not much we can do ...
-        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        outputHandler.warn(message);
         // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
         // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
         ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 498ad26..45994d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -20,9 +20,13 @@ package org.apache.cassandra.io.sstable;
 import java.io.*;
 import java.util.Iterator;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.Descriptor.Version;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.serializers.MarshalException;
 
@@ -66,6 +70,35 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
     }
 
+    /**
+     * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+     */
+    public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+    {
+        final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+        final CellNameType type = sstable.metadata.comparator;
+        final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+        final Version version = sstable.descriptor.version;
+        final long dataEnd = file.getFilePointer() + dataSize;
+        return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
+                                           new AbstractIterator<OnDiskAtom>()
+                                                   {
+                                                       protected OnDiskAtom computeNext()
+                                                       {
+                                                           if (file.getFilePointer() >= dataEnd)
+                                                               return endOfData();
+                                                           try
+                                                           {
+                                                               return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version);
+                                                           }
+                                                           catch (IOException e)
+                                                           {
+                                                               throw new IOError(e);
+                                                           }
+                                                       }
+                                                   });
+    }
+
     // sstable may be null *if* checkData is false
     // If it is null, we assume the data is in the current file format
     private SSTableIdentityIterator(CFMetaData metadata,
@@ -77,23 +110,15 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                                     SSTableReader sstable,
                                     ColumnSerializer.Flag flag)
     {
-        assert !checkData || (sstable != null);
-        this.in = in;
-        this.filename = filename;
-        this.key = key;
-        this.dataSize = dataSize;
-        this.flag = flag;
-        this.validateColumns = checkData;
-        this.sstable = sstable;
-
-        Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version;
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+        this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
+             metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
+    }
 
+    private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename)
+    {
         try
         {
-            columnFamily.delete(DeletionTime.serializer.deserialize(in));
-            atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+            return DeletionTime.serializer.deserialize(in);
         }
         catch (IOException e)
         {
@@ -103,6 +128,32 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
         }
     }
 
+    // sstable may be null *if* checkData is false
+    // If it is null, we assume the data is in the current file format
+    private SSTableIdentityIterator(CFMetaData metadata,
+                                    DataInput in,
+                                    String filename,
+                                    DecoratedKey key,
+                                    long dataSize,
+                                    boolean checkData,
+                                    SSTableReader sstable,
+                                    ColumnSerializer.Flag flag,
+                                    DeletionTime deletion,
+                                    Iterator<OnDiskAtom> atomIterator)
+    {
+        assert !checkData || (sstable != null);
+        this.in = in;
+        this.filename = filename;
+        this.key = key;
+        this.dataSize = dataSize;
+        this.flag = flag;
+        this.validateColumns = checkData;
+        this.sstable = sstable;
+        columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+        columnFamily.delete(deletion);
+        this.atomIterator = atomIterator;
+    }
+
     public DecoratedKey getKey()
     {
         return key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8e0b5f7..8620f30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -319,6 +320,7 @@ public class SSTableWriter extends SSTable
 
                 columnIndexer.add(atom); // This write the atom on disk too
             }
+            columnIndexer.finishAddingAtoms();
 
             columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
index 376ca9d..7621f07 100644
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
new file mode 100644
index 0000000..fc23cfe
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
new file mode 100644
index 0000000..a4157d3
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
new file mode 100644
index 0000000..fb42fa9
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
@@ -0,0 +1 @@
+3265926428
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
new file mode 100644
index 0000000..eb0ae30
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
new file mode 100644
index 0000000..69a2fce
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
new file mode 100644
index 0000000..1cba196
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
new file mode 100644
index 0000000..22cfa6a
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
new file mode 100644
index 0000000..503f64d
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
@@ -0,0 +1,8 @@
+Digest.sha1
+Summary.db
+Filter.db
+Index.db
+Statistics.db
+Data.db
+CRC.db
+TOC.txt

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 237573e..ce58e11 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -58,6 +58,7 @@ public class RowIndexEntryTest extends SchemaLoader
                 add(column);
             }
             while (size < DatabaseDescriptor.getColumnIndexSize() * 3);
+            finishAddingAtoms();
 
         }}.build();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index f8acd22..167671b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.UUIDGen;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,6 +52,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -60,7 +63,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
 import static org.apache.cassandra.Util.column;
-
 import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -362,6 +364,59 @@ public class ScrubTest extends SchemaLoader
         assert rows.size() == 6 : "Got " + rows.size();
     }
 
+    @Test
+    public void testScrub10791() throws Exception
+    {
+        // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled.
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        String columnFamily = "StandardInteger1";
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        String root = System.getProperty("corrupt-sstable-root");
+        assert root != null;
+        File rootDir = new File(root);
+        assert rootDir.isDirectory();
+        Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
+        CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
+
+        // open without validation for scrubbing
+        Set<Component> components = new HashSet<>();
+        components.add(Component.DATA);
+        components.add(Component.PRIMARY_INDEX);
+        components.add(Component.FILTER);
+        components.add(Component.STATS);
+        components.add(Component.SUMMARY);
+        components.add(Component.TOC);
+        SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
+
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
+        scrubber.scrub();
+
+        cfs.loadNewSSTables();
+        assertEquals(7, countCells(cfs));
+    }
+
+    private int countCells(ColumnFamilyStore cfs)
+    {
+        int cellCount = 0;
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+            while (it.hasNext())
+            {
+                Iterator<OnDiskAtom> itr = it.next();
+                while (itr.hasNext())
+                {
+                    ++cellCount;
+                    itr.next();
+                }
+            }
+        }
+        return cellCount;
+    }
+
     private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
     {
         boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 06ebdd3..31dc492 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.db.compaction.Scrubber.ScrubResult;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -51,8 +56,9 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
-
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.concurrent.Refs;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.apache.cassandra.Util.cellname;
@@ -265,7 +271,7 @@ public class StreamingTransferTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(ks);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 
-        String key = "key1";
+        String key = "key0";
         Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
         // add columns of size slightly less than column_index_size to force insert column index
         rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
@@ -274,9 +280,21 @@ public class StreamingTransferTest extends SchemaLoader
         // add RangeTombstones
         cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
         cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+        cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+        rm.apply();
+
+        key = "key1";
+        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+        // add columns of size slightly less than column_index_size to force insert column index
+        rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+        cf = rm.addOrGet(cfname);
+        // add RangeTombstones
+        cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
         rm.apply();
+
         cfs.forceBlockingFlush();
 
+        int cellCount = countCells(cfs);
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
         transferSSTables(sstable);
@@ -284,8 +302,30 @@ public class StreamingTransferTest extends SchemaLoader
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
 
+        // Verify table
+        assertEquals(cellCount, countCells(cfs));
+
         List<Row> rows = Util.getRangeSlice(cfs);
-        assertEquals(1, rows.size());
+        assertEquals(2, rows.size());
+    }
+
+    private int countCells(ColumnFamilyStore cfs)
+    {
+        int cellCount = 0;
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+            while (it.hasNext())
+            {
+                Iterator<OnDiskAtom> itr = it.next();
+                while (itr.hasNext())
+                {
+                    ++cellCount;
+                    itr.next();
+                }
+            }
+        }
+        return cellCount;
     }
 
     @Test


[3/4] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb6973a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb6973a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb6973a6

Branch: refs/heads/cassandra-3.1
Commit: bb6973a6fe132249031a281788f232c975e3456f
Parents: b950c38 882df8a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:09:09 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:09:09 2015 +0100

----------------------------------------------------------------------

----------------------------------------------------------------------



[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b97fa110
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b97fa110
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b97fa110

Branch: refs/heads/cassandra-3.1
Commit: b97fa1101e2ad8d4517117113ac2a40a5387e289
Parents: 38eaf7f bb6973a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:09:19 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:09:19 2015 +0100

----------------------------------------------------------------------

----------------------------------------------------------------------



[2/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/882df8a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/882df8a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/882df8a2

Branch: refs/heads/cassandra-3.1
Commit: 882df8a21711559d18bf6b38cd6026d78b7e4956
Parents: 29ed6fe 0b26ca6
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:06:43 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:06:43 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  16 ++--
 .../org/apache/cassandra/db/RangeTombstone.java |   5 ++
 .../cassandra/db/compaction/Scrubber.java       |  25 +++++-
 .../io/sstable/SSTableIdentityIterator.java     |  76 +++++++++++++++----
 .../io/sstable/format/big/BigTableWriter.java   |   1 +
 .../Keyspace1-Standard3-jb-1-Summary.db         | Bin 71 -> 63 bytes
 .../Keyspace1-StandardInteger1-ka-2-CRC.db      | Bin 0 -> 8 bytes
 .../Keyspace1-StandardInteger1-ka-2-Data.db     | Bin 0 -> 12357 bytes
 .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 |   1 +
 .../Keyspace1-StandardInteger1-ka-2-Filter.db   | Bin 0 -> 176 bytes
 .../Keyspace1-StandardInteger1-ka-2-Index.db    | Bin 0 -> 108 bytes
 ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
 .../Keyspace1-StandardInteger1-ka-2-Summary.db  | Bin 0 -> 112 bytes
 .../Keyspace1-StandardInteger1-ka-2-TOC.txt     |   8 ++
 .../apache/cassandra/db/RowIndexEntryTest.java  |   1 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |  60 +++++++++++++++
 .../streaming/StreamingTransferTest.java        |  46 ++++++++++-
 18 files changed, 216 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eaad3a2,e00abfe..bca5fb0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 +Merged from 2.1:
+  * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
   * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 670c682,e02f901..9fd8560
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -162,6 -155,22 +162,22 @@@ public class Scrubber implements Closea
                  if (scrubInfo.isStopRequested())
                      throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
  
+                 updateIndexKey();
+ 
+                 if (prevKey != null && indexFile != null)
+                 {
+                     long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex;
+                     if (dataFile.getFilePointer() < nextRowStart)
+                     {
+                         // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table.
+                         saveOutOfOrderRow(prevKey,
 -                                          SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
++                                          SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), checkData),
+                                           String.format("Row fragment detected after END_OF_ROW at key %s", prevKey));
+                         if (dataFile.isEOF())
+                             break;
+                     }
+                 }
+ 
                  long rowStart = dataFile.getFilePointer();
                  outputHandler.debug("Reading row at " + rowStart);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 17f9a8d,45994d0..8c02ee7
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@@ -20,13 -20,13 +20,16 @@@ package org.apache.cassandra.io.sstable
  import java.io.*;
  import java.util.Iterator;
  
+ import com.google.common.collect.AbstractIterator;
+ 
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+ import org.apache.cassandra.db.composites.CellNameType;
 -import org.apache.cassandra.io.sstable.Descriptor.Version;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.serializers.MarshalException;
  
@@@ -60,13 -62,43 +63,42 @@@
       * @param sstable SSTable we are reading ffrom.
       * @param file Reading using this file.
       * @param key Key of this row.
 -     * @param dataSize length of row data
       * @param checkData if true, do its best to deserialize and check the coherence of row data
       */
 -    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
 +    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, boolean checkData)
      {
 -        this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
 +        this(sstable.metadata, file, file.getPath(), key, checkData, sstable, ColumnSerializer.Flag.LOCAL);
      }
  
+     /**
+      * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+      */
+     public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+     {
+         final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+         final CellNameType type = sstable.metadata.comparator;
+         final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+         final Version version = sstable.descriptor.version;
+         final long dataEnd = file.getFilePointer() + dataSize;
 -        return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
++        return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, checkData, sstable, flag, DeletionTime.LIVE,
+                                            new AbstractIterator<OnDiskAtom>()
+                                                    {
+                                                        protected OnDiskAtom computeNext()
+                                                        {
+                                                            if (file.getFilePointer() >= dataEnd)
+                                                                return endOfData();
+                                                            try
+                                                            {
+                                                                return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version);
+                                                            }
+                                                            catch (IOException e)
+                                                            {
+                                                                throw new IOError(e);
+                                                            }
+                                                        }
+                                                    });
+     }
+ 
      // sstable may be null *if* checkData is false
      // If it is null, we assume the data is in the current file format
      private SSTableIdentityIterator(CFMetaData metadata,
@@@ -77,22 -110,15 +109,16 @@@
                                      SSTableReader sstable,
                                      ColumnSerializer.Flag flag)
      {
-         assert !checkData || (sstable != null);
-         this.in = in;
-         this.filename = filename;
-         this.key = key;
-         this.flag = flag;
-         this.validateColumns = checkData;
-         this.sstable = sstable;
- 
-         Version dataVersion = sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version;
-         int expireBefore = (int) (System.currentTimeMillis() / 1000);
-         columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
 -        this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
 -             metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
++        this(metadata, in, filename, key, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
++             metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000),
++                                        sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version));
+     }
  
+     private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename)
+     {
          try
          {
-             columnFamily.delete(DeletionTime.serializer.deserialize(in));
-             atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+             return DeletionTime.serializer.deserialize(in);
          }
          catch (IOException e)
          {
@@@ -102,6 -128,32 +128,30 @@@
          }
      }
  
+     // sstable may be null *if* checkData is false
+     // If it is null, we assume the data is in the current file format
+     private SSTableIdentityIterator(CFMetaData metadata,
+                                     DataInput in,
+                                     String filename,
+                                     DecoratedKey key,
 -                                    long dataSize,
+                                     boolean checkData,
+                                     SSTableReader sstable,
+                                     ColumnSerializer.Flag flag,
+                                     DeletionTime deletion,
+                                     Iterator<OnDiskAtom> atomIterator)
+     {
+         assert !checkData || (sstable != null);
+         this.in = in;
+         this.filename = filename;
+         this.key = key;
 -        this.dataSize = dataSize;
+         this.flag = flag;
+         this.validateColumns = checkData;
+         this.sstable = sstable;
+         columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+         columnFamily.delete(deletion);
+         this.atomIterator = atomIterator;
+     }
+ 
      public DecoratedKey getKey()
      {
          return key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index d064e69,0000000..505bac0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,587 -1,0 +1,588 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +import static org.apache.cassandra.utils.Throwables.merge;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private final IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
 +        }
 +        iwriter = new IndexWriter(keyCount, dataFile);
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataEnd);
 +        iwriter.append(decoratedKey, index, dataEnd);
 +        dbuilder.addPotentialBoundary(dataEnd);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long startPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(startPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +            long endPosition = dataFile.getFilePointer();
 +            long rowSize = endPosition - startPosition;
 +            maybeLogLargePartitionWarning(row.key, rowSize);
 +            metadataCollector.update(rowSize, row.columnStats());
 +            afterAppend(row.key, endPosition, entry);
 +            return entry;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        long endPosition;
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            endPosition = dataFile.getFilePointer();
 +            afterAppend(decoratedKey, endPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        long rowSize = endPosition - startPosition;
 +        maybeLogLargePartitionWarning(decoratedKey, rowSize);
 +        metadataCollector.update(endPosition - startPosition, cf.getColumnStats());
 +    }
 +
 +    private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
 +    {
 +        if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
 +        {
 +            String keyString = metadata.getKeyValidator().getString(key.getKey());
 +            logger.warn("Writing large partition {}/{}:{} ({} bytes)", metadata.ksName, metadata.cfName, keyString, rowSize);
 +        }
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
++            columnIndexer.finishAddingAtoms();
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public SSTableReader openEarly()
 +    {
 +        // find the max (exclusive) readable key
 +        IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
 +        if (boundary == null)
 +            return null;
 +
 +        StatsMetadata stats = statsMetadata();
 +        assert boundary.indexLength > 0 && boundary.dataLength > 0;
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
 +        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength);
 +        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, iwriter.summary.build(partitioner, boundary),
 +                                                           iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(boundary.lastKey);
 +        return sstable;
 +    }
 +
 +    public SSTableReader openFinalEarly()
 +    {
 +        // we must ensure the data is completely flushed to disk
 +        dataFile.sync();
 +        iwriter.indexFile.sync();
 +        return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
 +    {
 +        if (maxDataAge < 0)
 +            maxDataAge = System.currentTimeMillis();
 +
 +        StatsMetadata stats = statsMetadata();
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
 +        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA));
 +        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf.sharedCopy(),
 +                                                           maxDataAge,
 +                                                           stats,
 +                                                           openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +        return sstable;
 +    }
 +
 +    protected SSTableWriter.TransactionalProxy txnProxy()
 +    {
 +        return new TransactionalProxy();
 +    }
 +
 +    class TransactionalProxy extends SSTableWriter.TransactionalProxy
 +    {
 +        // finalise our state on disk, including renaming
 +        protected void doPrepare()
 +        {
 +            iwriter.prepareToCommit();
 +
 +            // write sstable statistics
 +            dataFile.setDescriptor(descriptor).prepareToCommit();
 +            writeMetadata(descriptor, finalizeMetadata());
 +
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +
 +            // rename to final
 +            rename(descriptor, components);
 +
 +            if (openResult)
 +                finalReader = openFinal(descriptor.asType(Descriptor.Type.FINAL), SSTableReader.OpenReason.NORMAL);
 +        }
 +
 +        protected Throwable doCommit(Throwable accumulate)
 +        {
 +            accumulate = dataFile.commit(accumulate);
 +            accumulate = iwriter.commit(accumulate);
 +            return accumulate;
 +        }
 +
 +        @Override
 +        protected Throwable doPreCleanup(Throwable accumulate)
 +        {
 +            accumulate = dbuilder.close(accumulate);
 +            return accumulate;
 +        }
 +
 +        protected Throwable doAbort(Throwable accumulate)
 +        {
 +            accumulate = iwriter.abort(accumulate);
 +            accumulate = dataFile.abort(accumulate);
 +
 +            accumulate = delete(descriptor, accumulate);
 +            if (!openResult)
 +                accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), accumulate);
 +            return accumulate;
 +        }
 +
 +        private Throwable delete(Descriptor desc, Throwable accumulate)
 +        {
 +            try
 +            {
 +                Set<Component> components = SSTable.discoverComponentsFor(desc);
 +                if (!components.isEmpty())
 +                    SSTable.delete(desc, components);
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.error(String.format("Failed deleting temp components for %s", descriptor), t);
 +                accumulate = merge(accumulate, t);
 +            }
 +            return accumulate;
 +        }
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        File file = new File(desc.filenameFor(Component.STATS));
 +        try (SequentialWriter out = SequentialWriter.open(file);)
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +            out.setDescriptor(desc).finish();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, file.getPath());
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
 +    class IndexWriter extends AbstractTransactional implements Transactional
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount, final SequentialWriter dataFile)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +            // register listeners to be alerted when the data files are flushed
 +            indexFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markIndexSynced(indexFile.getLastFlushOffset());
 +                }
 +            });
 +            dataFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markDataSynced(dataFile.getLastFlushOffset());
 +                }
 +            });
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        IndexSummaryBuilder.ReadableBoundary getMaxReadable()
 +        {
 +            return summary.getLastReadableBoundary();
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException
 +        {
 +            bf.add(key);
 +            long indexStart = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +            long indexEnd = indexFile.getFilePointer();
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
 +
 +            summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
 +            builder.addPotentialBoundary(indexStart);
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        void flushBf()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try (FileOutputStream fos = new FileOutputStream(path);
 +                     DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos))
 +                {
 +                    // bloom filter
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    SyncUtil.sync(fos);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +
 +        protected void doPrepare()
 +        {
 +            flushBf();
 +
 +            // truncate index file
 +            long position = iwriter.indexFile.getFilePointer();
 +            iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
 +            FileUtils.truncate(iwriter.indexFile.getPath(), position);
 +
 +            // save summary
 +            summary.prepareToCommit();
 +            try (IndexSummary summary = iwriter.summary.build(partitioner))
 +            {
 +                SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary);
 +            }
 +        }
 +
 +        protected Throwable doCommit(Throwable accumulate)
 +        {
 +            return indexFile.commit(accumulate);
 +        }
 +
 +        protected Throwable doAbort(Throwable accumulate)
 +        {
 +            return indexFile.abort(accumulate);
 +        }
 +
 +        @Override
 +        protected Throwable doPreCleanup(Throwable accumulate)
 +        {
 +            accumulate = summary.close(accumulate);
 +            accumulate = bf.close(accumulate);
 +            accumulate = builder.close(accumulate);
 +            return accumulate;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --cc test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
index 0000000,22cfa6a..190922a
mode 000000,100644..100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 0d90354,167671b..b69a1f8
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -20,98 -20,68 +20,103 @@@ package org.apache.cassandra.db
   *
   */
  
 -import java.io.*;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
  import java.nio.ByteBuffer;
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Iterator;
 -import java.util.List;
 -import java.util.Set;
 +import java.util.*;
  import java.util.concurrent.ExecutionException;
  
 +import java.io.File;
 +import java.io.IOError;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.CounterColumnType;
 +import org.apache.cassandra.db.marshal.UUIDType;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.db.marshal.CompositeType;
 -import org.apache.cassandra.db.marshal.LongType;
 -import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.exceptions.RequestExecutionException;
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.UUIDGen;
+ 
  import org.apache.commons.lang3.StringUtils;
+ 
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
 -import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.cql3.UntypedResultSet;
 -import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.Util;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.Scrubber;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.SSTableRewriter;
 +import org.apache.cassandra.OrderedJUnit4ClassRunner;
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
 +import static org.junit.Assert.*;
 +import static org.junit.Assume.assumeTrue;
 +
  import static org.apache.cassandra.Util.cellname;
  import static org.apache.cassandra.Util.column;
 -import static junit.framework.Assert.assertNotNull;
  import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
 -import static org.junit.Assume.assumeTrue;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
 -public class ScrubTest extends SchemaLoader
 +public class ScrubTest
  {
 -    public String KEYSPACE = "Keyspace1";
 -    public String CF = "Standard1";
 -    public String CF3 = "Standard2";
 -    public String COUNTER_CF = "Counter1";
 -    private static Integer COMPRESSION_CHUNK_LENGTH = 4096;
 +    public static final String KEYSPACE = "Keyspace1";
 +    public static final String CF = "Standard1";
 +    public static final String CF2 = "Standard2";
 +    public static final String CF3 = "Standard3";
++    public static final String CFI1 = "StandardInteger1";
 +    public static final String COUNTER_CF = "Counter1";
 +    public static final String CF_UUID = "UUIDKeys";
 +    public static final String CF_INDEX1 = "Indexed1";
 +    public static final String CF_INDEX2 = "Indexed2";
 +
 +    public static final String COL_KEYS_INDEX = "birthdate";
 +    public static final String COL_COMPOSITES_INDEX = "col1";
 +    public static final String COL_NON_INDEX = "notanindexcol";
 +
 +    public static final Integer COMPRESSION_CHUNK_LENGTH = 4096;
  
      @BeforeClass
 -    public static void loadSchema() throws ConfigurationException
 +    public static void defineSchema() throws ConfigurationException
      {
 -        loadSchema(COMPRESSION_CHUNK_LENGTH);
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
++                                    SchemaLoader.standardCFMD(KEYSPACE, CFI1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, COUNTER_CF)
 +                                                .defaultValidator(CounterColumnType.instance)
 +                                                .compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance),
 +                                    SchemaLoader.indexCFMD(KEYSPACE, CF_INDEX1, true),
 +                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true));
      }
  
      @Test
@@@ -422,6 -364,59 +427,61 @@@
          assert rows.size() == 6 : "Got " + rows.size();
      }
  
+     @Test
+     public void testScrub10791() throws Exception
+     {
+         // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled.
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
 -        String columnFamily = "StandardInteger1";
++        String columnFamily = CFI1;
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         String root = System.getProperty("corrupt-sstable-root");
+         assert root != null;
+         File rootDir = new File(root);
+         assert rootDir.isDirectory();
 -        Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
 -        CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
++        Descriptor desc = new Descriptor("ka", rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY);
+ 
+         // open without validation for scrubbing
+         Set<Component> components = new HashSet<>();
+         components.add(Component.DATA);
+         components.add(Component.PRIMARY_INDEX);
+         components.add(Component.FILTER);
+         components.add(Component.STATS);
+         components.add(Component.SUMMARY);
+         components.add(Component.TOC);
 -        SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
++        SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs);
+ 
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
 -        scrubber.scrub();
++        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
++        {
++            scrubber.scrub();
++        }
+ 
+         cfs.loadNewSSTables();
+         assertEquals(7, countCells(cfs));
+     }
+ 
+     private int countCells(ColumnFamilyStore cfs)
+     {
+         int cellCount = 0;
+         for (SSTableReader sstable : cfs.getSSTables())
+         {
+             Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+             while (it.hasNext())
+             {
+                 Iterator<OnDiskAtom> itr = it.next();
+                 while (itr.hasNext())
+                 {
+                     ++cellCount;
+                     itr.next();
+                 }
+             }
+         }
+         return cellCount;
+     }
+ 
      private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
      {
          boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 3c799e2,31dc492..e751968
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -26,7 -26,7 +26,8 @@@ import java.util.concurrent.TimeUnit
  import com.google.common.collect.Iterables;
  import com.google.common.util.concurrent.FutureCallback;
  import com.google.common.util.concurrent.Futures;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ 
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
@@@ -306,9 -280,21 +312,21 @@@ public class StreamingTransferTes
          // add RangeTombstones
          cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
          cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+         cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.apply();
++        rm.applyUnsafe();
+ 
+         key = "key1";
+         rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+         // add columns of size slightly less than column_index_size to force insert column index
+         rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+         cf = rm.addOrGet(cfname);
+         // add RangeTombstones
+         cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.apply();
 +        rm.applyUnsafe();
+ 
          cfs.forceBlockingFlush();
  
+         int cellCount = countCells(cfs);
          SSTableReader sstable = cfs.getSSTables().iterator().next();
          cfs.clearUnsafe();
          transferSSTables(sstable);