You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/02 15:10:01 UTC
cassandra git commit: Avoid writing range tombstones after END_OF_ROW
marker.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 51753263a -> 0b26ca687
Avoid writing range tombstones after END_OF_ROW marker.
Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-10791
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26ca68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26ca68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26ca68
Branch: refs/heads/cassandra-2.1
Commit: 0b26ca68747cdecb907d7c238e04b39836efe3d1
Parents: 5175326
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 1 11:38:09 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 14:59:11 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 16 ++--
.../org/apache/cassandra/db/RangeTombstone.java | 5 ++
.../cassandra/db/compaction/Scrubber.java | 25 +++++-
.../io/sstable/SSTableIdentityIterator.java | 79 +++++++++++++++----
.../cassandra/io/sstable/SSTableWriter.java | 2 +
.../Keyspace1-Standard3-jb-1-Summary.db | Bin 71 -> 63 bytes
.../Keyspace1-StandardInteger1-ka-2-CRC.db | Bin 0 -> 8 bytes
.../Keyspace1-StandardInteger1-ka-2-Data.db | Bin 0 -> 12357 bytes
.../Keyspace1-StandardInteger1-ka-2-Digest.sha1 | 1 +
.../Keyspace1-StandardInteger1-ka-2-Filter.db | Bin 0 -> 176 bytes
.../Keyspace1-StandardInteger1-ka-2-Index.db | Bin 0 -> 108 bytes
...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
.../Keyspace1-StandardInteger1-ka-2-Summary.db | Bin 0 -> 80 bytes
.../Keyspace1-StandardInteger1-ka-2-TOC.txt | 8 ++
.../apache/cassandra/db/RowIndexEntryTest.java | 1 +
.../unit/org/apache/cassandra/db/ScrubTest.java | 57 ++++++++++++-
.../streaming/StreamingTransferTest.java | 46 ++++++++++-
18 files changed, 215 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0f9588..e00abfe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 0ea5c87..f63dfe1 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -147,6 +147,7 @@ public class ColumnIndex
add(tombstone);
tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
}
+ finishAddingAtoms();
ColumnIndex index = build();
maybeWriteEmptyRowHeader();
@@ -167,6 +168,7 @@ public class ColumnIndex
OnDiskAtom c = columns.next();
add(c);
}
+ finishAddingAtoms();
return build();
}
@@ -218,15 +220,19 @@ public class ColumnIndex
}
}
- public ColumnIndex build() throws IOException
+ public void finishAddingAtoms() throws IOException
{
- // all columns were GC'd after all
- if (lastColumn == null)
- return ColumnIndex.EMPTY;
-
long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer);
endPosition += size;
blockSize += size;
+ }
+
+ public ColumnIndex build()
+ {
+ assert !tombstoneTracker.hasUnwrittenTombstones(); // finishAddingAtoms must be called before building.
+ // all columns were GC'd after all
+ if (lastColumn == null)
+ return ColumnIndex.EMPTY;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 4d22d48..5e41792 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -325,6 +325,11 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
return false;
}
+ public boolean hasUnwrittenTombstones()
+ {
+ return !unwrittenTombstones.isEmpty();
+ }
+
/**
* The tracker needs to track expired range tombstone but keep tracks that they are
* expired, so this is what this class is used for.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 400df08..e02f901 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -155,6 +155,22 @@ public class Scrubber implements Closeable
if (scrubInfo.isStopRequested())
throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
+ updateIndexKey();
+
+ if (prevKey != null && indexFile != null)
+ {
+ long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex;
+ if (dataFile.getFilePointer() < nextRowStart)
+ {
+ // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table.
+ saveOutOfOrderRow(prevKey,
+ SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
+ String.format("Row fragment detected after END_OF_ROW at key %s", prevKey));
+ if (dataFile.isEOF())
+ break;
+ }
+ }
+
long rowStart = dataFile.getFilePointer();
outputHandler.debug("Reading row at " + rowStart);
@@ -170,8 +186,6 @@ public class Scrubber implements Closeable
// check for null key below
}
- updateIndexKey();
-
long dataStart = dataFile.getFilePointer();
long dataStartFromIndex = -1;
@@ -369,8 +383,13 @@ public class Scrubber implements Closeable
private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
{
+ saveOutOfOrderRow(key, atoms, String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ }
+
+ void saveOutOfOrderRow(DecoratedKey key, SSTableIdentityIterator atoms, String message)
+ {
// TODO bitch if the row is too large? if it is there's not much we can do ...
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ outputHandler.warn(message);
// adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
// and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 498ad26..45994d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -20,9 +20,13 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
import java.util.Iterator;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.Descriptor.Version;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.serializers.MarshalException;
@@ -66,6 +70,35 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
}
+ /**
+ * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+ */
+ public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+ {
+ final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+ final CellNameType type = sstable.metadata.comparator;
+ final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+ final Version version = sstable.descriptor.version;
+ final long dataEnd = file.getFilePointer() + dataSize;
+ return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
+ new AbstractIterator<OnDiskAtom>()
+ {
+ protected OnDiskAtom computeNext()
+ {
+ if (file.getFilePointer() >= dataEnd)
+ return endOfData();
+ try
+ {
+ return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ });
+ }
+
// sstable may be null *if* checkData is false
// If it is null, we assume the data is in the current file format
private SSTableIdentityIterator(CFMetaData metadata,
@@ -77,23 +110,15 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
SSTableReader sstable,
ColumnSerializer.Flag flag)
{
- assert !checkData || (sstable != null);
- this.in = in;
- this.filename = filename;
- this.key = key;
- this.dataSize = dataSize;
- this.flag = flag;
- this.validateColumns = checkData;
- this.sstable = sstable;
-
- Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version;
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
- columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+ this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
+ metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
+ }
+ private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename)
+ {
try
{
- columnFamily.delete(DeletionTime.serializer.deserialize(in));
- atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+ return DeletionTime.serializer.deserialize(in);
}
catch (IOException e)
{
@@ -103,6 +128,32 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
}
}
+ // sstable may be null *if* checkData is false
+ // If it is null, we assume the data is in the current file format
+ private SSTableIdentityIterator(CFMetaData metadata,
+ DataInput in,
+ String filename,
+ DecoratedKey key,
+ long dataSize,
+ boolean checkData,
+ SSTableReader sstable,
+ ColumnSerializer.Flag flag,
+ DeletionTime deletion,
+ Iterator<OnDiskAtom> atomIterator)
+ {
+ assert !checkData || (sstable != null);
+ this.in = in;
+ this.filename = filename;
+ this.key = key;
+ this.dataSize = dataSize;
+ this.flag = flag;
+ this.validateColumns = checkData;
+ this.sstable = sstable;
+ columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+ columnFamily.delete(deletion);
+ this.atomIterator = atomIterator;
+ }
+
public DecoratedKey getKey()
{
return key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8e0b5f7..8620f30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -319,6 +320,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
+ columnIndexer.finishAddingAtoms();
columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
index 376ca9d..7621f07 100644
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
new file mode 100644
index 0000000..fc23cfe
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
new file mode 100644
index 0000000..a4157d3
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
new file mode 100644
index 0000000..fb42fa9
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
@@ -0,0 +1 @@
+3265926428
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
new file mode 100644
index 0000000..eb0ae30
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
new file mode 100644
index 0000000..69a2fce
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
new file mode 100644
index 0000000..1cba196
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
new file mode 100644
index 0000000..22cfa6a
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
new file mode 100644
index 0000000..503f64d
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
@@ -0,0 +1,8 @@
+Digest.sha1
+Summary.db
+Filter.db
+Index.db
+Statistics.db
+Data.db
+CRC.db
+TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 237573e..ce58e11 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -58,6 +58,7 @@ public class RowIndexEntryTest extends SchemaLoader
add(column);
}
while (size < DatabaseDescriptor.getColumnIndexSize() * 3);
+ finishAddingAtoms();
}}.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index f8acd22..167671b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.UUIDGen;
+
import org.apache.commons.lang3.StringUtils;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,6 +52,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -60,7 +63,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.column;
-
import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -362,6 +364,59 @@ public class ScrubTest extends SchemaLoader
assert rows.size() == 6 : "Got " + rows.size();
}
+ @Test
+ public void testScrub10791() throws Exception
+ {
+ // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled.
+ CompactionManager.instance.disableAutoCompaction();
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ String columnFamily = "StandardInteger1";
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ String root = System.getProperty("corrupt-sstable-root");
+ assert root != null;
+ File rootDir = new File(root);
+ assert rootDir.isDirectory();
+ Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
+ CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
+
+ // open without validation for scrubbing
+ Set<Component> components = new HashSet<>();
+ components.add(Component.DATA);
+ components.add(Component.PRIMARY_INDEX);
+ components.add(Component.FILTER);
+ components.add(Component.STATS);
+ components.add(Component.SUMMARY);
+ components.add(Component.TOC);
+ SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
+
+ Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
+ scrubber.scrub();
+
+ cfs.loadNewSSTables();
+ assertEquals(7, countCells(cfs));
+ }
+
+ private int countCells(ColumnFamilyStore cfs)
+ {
+ int cellCount = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+ while (it.hasNext())
+ {
+ Iterator<OnDiskAtom> itr = it.next();
+ while (itr.hasNext())
+ {
+ ++cellCount;
+ itr.next();
+ }
+ }
+ }
+ return cellCount;
+ }
+
private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
{
boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 06ebdd3..31dc492 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.db.compaction.Scrubber.ScrubResult;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -51,8 +56,9 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
-
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.concurrent.Refs;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.apache.cassandra.Util.cellname;
@@ -265,7 +271,7 @@ public class StreamingTransferTest extends SchemaLoader
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
- String key = "key1";
+ String key = "key0";
Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
// add columns of size slightly less than column_index_size to force insert column index
rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
@@ -274,9 +280,21 @@ public class StreamingTransferTest extends SchemaLoader
// add RangeTombstones
cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+ cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+ rm.apply();
+
+ key = "key1";
+ rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+ // add columns of size slightly less than column_index_size to force insert column index
+ rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+ cf = rm.addOrGet(cfname);
+ // add RangeTombstones
+ cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
rm.apply();
+
cfs.forceBlockingFlush();
+ int cellCount = countCells(cfs);
SSTableReader sstable = cfs.getSSTables().iterator().next();
cfs.clearUnsafe();
transferSSTables(sstable);
@@ -284,8 +302,30 @@ public class StreamingTransferTest extends SchemaLoader
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
+ // Verify table
+ assertEquals(cellCount, countCells(cfs));
+
List<Row> rows = Util.getRangeSlice(cfs);
- assertEquals(1, rows.size());
+ assertEquals(2, rows.size());
+ }
+
+ private int countCells(ColumnFamilyStore cfs)
+ {
+ int cellCount = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+ while (it.hasNext())
+ {
+ Iterator<OnDiskAtom> itr = it.next();
+ while (itr.hasNext())
+ {
+ ++cellCount;
+ itr.next();
+ }
+ }
+ }
+ return cellCount;
}
@Test