You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/02 15:10:58 UTC
[1/5] cassandra git commit: Avoid writing range tombstones after
END_OF_ROW marker.
Repository: cassandra
Updated Branches:
refs/heads/trunk 04e5f5d96 -> 51464631c
Avoid writing range tombstones after END_OF_ROW marker.
Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-10791
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26ca68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26ca68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26ca68
Branch: refs/heads/trunk
Commit: 0b26ca68747cdecb907d7c238e04b39836efe3d1
Parents: 5175326
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 1 11:38:09 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 14:59:11 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 16 ++--
.../org/apache/cassandra/db/RangeTombstone.java | 5 ++
.../cassandra/db/compaction/Scrubber.java | 25 +++++-
.../io/sstable/SSTableIdentityIterator.java | 79 +++++++++++++++----
.../cassandra/io/sstable/SSTableWriter.java | 2 +
.../Keyspace1-Standard3-jb-1-Summary.db | Bin 71 -> 63 bytes
.../Keyspace1-StandardInteger1-ka-2-CRC.db | Bin 0 -> 8 bytes
.../Keyspace1-StandardInteger1-ka-2-Data.db | Bin 0 -> 12357 bytes
.../Keyspace1-StandardInteger1-ka-2-Digest.sha1 | 1 +
.../Keyspace1-StandardInteger1-ka-2-Filter.db | Bin 0 -> 176 bytes
.../Keyspace1-StandardInteger1-ka-2-Index.db | Bin 0 -> 108 bytes
...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
.../Keyspace1-StandardInteger1-ka-2-Summary.db | Bin 0 -> 80 bytes
.../Keyspace1-StandardInteger1-ka-2-TOC.txt | 8 ++
.../apache/cassandra/db/RowIndexEntryTest.java | 1 +
.../unit/org/apache/cassandra/db/ScrubTest.java | 57 ++++++++++++-
.../streaming/StreamingTransferTest.java | 46 ++++++++++-
18 files changed, 215 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0f9588..e00abfe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 0ea5c87..f63dfe1 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -147,6 +147,7 @@ public class ColumnIndex
add(tombstone);
tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
}
+ finishAddingAtoms();
ColumnIndex index = build();
maybeWriteEmptyRowHeader();
@@ -167,6 +168,7 @@ public class ColumnIndex
OnDiskAtom c = columns.next();
add(c);
}
+ finishAddingAtoms();
return build();
}
@@ -218,15 +220,19 @@ public class ColumnIndex
}
}
- public ColumnIndex build() throws IOException
+ public void finishAddingAtoms() throws IOException
{
- // all columns were GC'd after all
- if (lastColumn == null)
- return ColumnIndex.EMPTY;
-
long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer);
endPosition += size;
blockSize += size;
+ }
+
+ public ColumnIndex build()
+ {
+ assert !tombstoneTracker.hasUnwrittenTombstones(); // finishAddingAtoms must be called before building.
+ // all columns were GC'd after all
+ if (lastColumn == null)
+ return ColumnIndex.EMPTY;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 4d22d48..5e41792 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -325,6 +325,11 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements
return false;
}
+ public boolean hasUnwrittenTombstones()
+ {
+ return !unwrittenTombstones.isEmpty();
+ }
+
/**
* The tracker needs to track expired range tombstone but keep tracks that they are
* expired, so this is what this class is used for.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 400df08..e02f901 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -155,6 +155,22 @@ public class Scrubber implements Closeable
if (scrubInfo.isStopRequested())
throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
+ updateIndexKey();
+
+ if (prevKey != null && indexFile != null)
+ {
+ long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex;
+ if (dataFile.getFilePointer() < nextRowStart)
+ {
+ // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table.
+ saveOutOfOrderRow(prevKey,
+ SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
+ String.format("Row fragment detected after END_OF_ROW at key %s", prevKey));
+ if (dataFile.isEOF())
+ break;
+ }
+ }
+
long rowStart = dataFile.getFilePointer();
outputHandler.debug("Reading row at " + rowStart);
@@ -170,8 +186,6 @@ public class Scrubber implements Closeable
// check for null key below
}
- updateIndexKey();
-
long dataStart = dataFile.getFilePointer();
long dataStartFromIndex = -1;
@@ -369,8 +383,13 @@ public class Scrubber implements Closeable
private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
{
+ saveOutOfOrderRow(key, atoms, String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ }
+
+ void saveOutOfOrderRow(DecoratedKey key, SSTableIdentityIterator atoms, String message)
+ {
// TODO bitch if the row is too large? if it is there's not much we can do ...
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ outputHandler.warn(message);
// adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
// and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 498ad26..45994d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -20,9 +20,13 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
import java.util.Iterator;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.Descriptor.Version;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.serializers.MarshalException;
@@ -66,6 +70,35 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
}
+ /**
+ * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+ */
+ public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+ {
+ final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+ final CellNameType type = sstable.metadata.comparator;
+ final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+ final Version version = sstable.descriptor.version;
+ final long dataEnd = file.getFilePointer() + dataSize;
+ return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
+ new AbstractIterator<OnDiskAtom>()
+ {
+ protected OnDiskAtom computeNext()
+ {
+ if (file.getFilePointer() >= dataEnd)
+ return endOfData();
+ try
+ {
+ return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ });
+ }
+
// sstable may be null *if* checkData is false
// If it is null, we assume the data is in the current file format
private SSTableIdentityIterator(CFMetaData metadata,
@@ -77,23 +110,15 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
SSTableReader sstable,
ColumnSerializer.Flag flag)
{
- assert !checkData || (sstable != null);
- this.in = in;
- this.filename = filename;
- this.key = key;
- this.dataSize = dataSize;
- this.flag = flag;
- this.validateColumns = checkData;
- this.sstable = sstable;
-
- Descriptor.Version dataVersion = sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version;
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
- columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+ this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
+ metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
+ }
+ private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename)
+ {
try
{
- columnFamily.delete(DeletionTime.serializer.deserialize(in));
- atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+ return DeletionTime.serializer.deserialize(in);
}
catch (IOException e)
{
@@ -103,6 +128,32 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
}
}
+ // sstable may be null *if* checkData is false
+ // If it is null, we assume the data is in the current file format
+ private SSTableIdentityIterator(CFMetaData metadata,
+ DataInput in,
+ String filename,
+ DecoratedKey key,
+ long dataSize,
+ boolean checkData,
+ SSTableReader sstable,
+ ColumnSerializer.Flag flag,
+ DeletionTime deletion,
+ Iterator<OnDiskAtom> atomIterator)
+ {
+ assert !checkData || (sstable != null);
+ this.in = in;
+ this.filename = filename;
+ this.key = key;
+ this.dataSize = dataSize;
+ this.flag = flag;
+ this.validateColumns = checkData;
+ this.sstable = sstable;
+ columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+ columnFamily.delete(deletion);
+ this.atomIterator = atomIterator;
+ }
+
public DecoratedKey getKey()
{
return key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8e0b5f7..8620f30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -319,6 +320,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
+ columnIndexer.finishAddingAtoms();
columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
index 376ca9d..7621f07 100644
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db
new file mode 100644
index 0000000..fc23cfe
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-CRC.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db
new file mode 100644
index 0000000..a4157d3
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1 b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
new file mode 100644
index 0000000..fb42fa9
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Digest.sha1
@@ -0,0 +1 @@
+3265926428
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db
new file mode 100644
index 0000000..eb0ae30
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Filter.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db
new file mode 100644
index 0000000..69a2fce
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Index.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db
new file mode 100644
index 0000000..1cba196
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
new file mode 100644
index 0000000..22cfa6a
Binary files /dev/null and b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
new file mode 100644
index 0000000..503f64d
--- /dev/null
+++ b/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-TOC.txt
@@ -0,0 +1,8 @@
+Digest.sha1
+Summary.db
+Filter.db
+Index.db
+Statistics.db
+Data.db
+CRC.db
+TOC.txt
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 237573e..ce58e11 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -58,6 +58,7 @@ public class RowIndexEntryTest extends SchemaLoader
add(column);
}
while (size < DatabaseDescriptor.getColumnIndexSize() * 3);
+ finishAddingAtoms();
}}.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index f8acd22..167671b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -38,7 +38,9 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.UUIDGen;
+
import org.apache.commons.lang3.StringUtils;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,6 +52,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -60,7 +63,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.column;
-
import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -362,6 +364,59 @@ public class ScrubTest extends SchemaLoader
assert rows.size() == 6 : "Got " + rows.size();
}
+ @Test
+ public void testScrub10791() throws Exception
+ {
+ // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled.
+ CompactionManager.instance.disableAutoCompaction();
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ String columnFamily = "StandardInteger1";
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ String root = System.getProperty("corrupt-sstable-root");
+ assert root != null;
+ File rootDir = new File(root);
+ assert rootDir.isDirectory();
+ Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
+ CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
+
+ // open without validation for scrubbing
+ Set<Component> components = new HashSet<>();
+ components.add(Component.DATA);
+ components.add(Component.PRIMARY_INDEX);
+ components.add(Component.FILTER);
+ components.add(Component.STATS);
+ components.add(Component.SUMMARY);
+ components.add(Component.TOC);
+ SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
+
+ Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
+ scrubber.scrub();
+
+ cfs.loadNewSSTables();
+ assertEquals(7, countCells(cfs));
+ }
+
+ private int countCells(ColumnFamilyStore cfs)
+ {
+ int cellCount = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+ while (it.hasNext())
+ {
+ Iterator<OnDiskAtom> itr = it.next();
+ while (itr.hasNext())
+ {
+ ++cellCount;
+ itr.next();
+ }
+ }
+ }
+ return cellCount;
+ }
+
private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
{
boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26ca68/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 06ebdd3..31dc492 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.db.compaction.Scrubber.ScrubResult;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -51,8 +56,9 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
-
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.concurrent.Refs;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.apache.cassandra.Util.cellname;
@@ -265,7 +271,7 @@ public class StreamingTransferTest extends SchemaLoader
Keyspace keyspace = Keyspace.open(ks);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
- String key = "key1";
+ String key = "key0";
Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
// add columns of size slightly less than column_index_size to force insert column index
rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
@@ -274,9 +280,21 @@ public class StreamingTransferTest extends SchemaLoader
// add RangeTombstones
cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+ cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+ rm.apply();
+
+ key = "key1";
+ rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+ // add columns of size slightly less than column_index_size to force insert column index
+ rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+ cf = rm.addOrGet(cfname);
+ // add RangeTombstones
+ cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
rm.apply();
+
cfs.forceBlockingFlush();
+ int cellCount = countCells(cfs);
SSTableReader sstable = cfs.getSSTables().iterator().next();
cfs.clearUnsafe();
transferSSTables(sstable);
@@ -284,8 +302,30 @@ public class StreamingTransferTest extends SchemaLoader
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
+ // Verify table
+ assertEquals(cellCount, countCells(cfs));
+
List<Row> rows = Util.getRangeSlice(cfs);
- assertEquals(1, rows.size());
+ assertEquals(2, rows.size());
+ }
+
+ private int countCells(ColumnFamilyStore cfs)
+ {
+ int cellCount = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+ while (it.hasNext())
+ {
+ Iterator<OnDiskAtom> itr = it.next();
+ while (itr.hasNext())
+ {
+ ++cellCount;
+ itr.next();
+ }
+ }
+ }
+ return cellCount;
}
@Test
[5/5] cassandra git commit: Merge branch 'cassandra-3.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51464631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51464631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51464631
Branch: refs/heads/trunk
Commit: 51464631c5445f7774ad95a1f1b0c12ac5aeb073
Parents: 04e5f5d b97fa11
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:09:27 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:09:27 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/5] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb6973a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb6973a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb6973a6
Branch: refs/heads/trunk
Commit: bb6973a6fe132249031a281788f232c975e3456f
Parents: b950c38 882df8a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:09:09 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:09:09 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/5] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/882df8a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/882df8a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/882df8a2
Branch: refs/heads/trunk
Commit: 882df8a21711559d18bf6b38cd6026d78b7e4956
Parents: 29ed6fe 0b26ca6
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:06:43 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:06:43 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 16 ++--
.../org/apache/cassandra/db/RangeTombstone.java | 5 ++
.../cassandra/db/compaction/Scrubber.java | 25 +++++-
.../io/sstable/SSTableIdentityIterator.java | 76 +++++++++++++++----
.../io/sstable/format/big/BigTableWriter.java | 1 +
.../Keyspace1-Standard3-jb-1-Summary.db | Bin 71 -> 63 bytes
.../Keyspace1-StandardInteger1-ka-2-CRC.db | Bin 0 -> 8 bytes
.../Keyspace1-StandardInteger1-ka-2-Data.db | Bin 0 -> 12357 bytes
.../Keyspace1-StandardInteger1-ka-2-Digest.sha1 | 1 +
.../Keyspace1-StandardInteger1-ka-2-Filter.db | Bin 0 -> 176 bytes
.../Keyspace1-StandardInteger1-ka-2-Index.db | Bin 0 -> 108 bytes
...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
.../Keyspace1-StandardInteger1-ka-2-Summary.db | Bin 0 -> 112 bytes
.../Keyspace1-StandardInteger1-ka-2-TOC.txt | 8 ++
.../apache/cassandra/db/RowIndexEntryTest.java | 1 +
.../unit/org/apache/cassandra/db/ScrubTest.java | 60 +++++++++++++++
.../streaming/StreamingTransferTest.java | 46 ++++++++++-
18 files changed, 216 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eaad3a2,e00abfe..bca5fb0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
+Merged from 2.1:
+ * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 670c682,e02f901..9fd8560
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -162,6 -155,22 +162,22 @@@ public class Scrubber implements Closea
if (scrubInfo.isStopRequested())
throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
+ updateIndexKey();
+
+ if (prevKey != null && indexFile != null)
+ {
+ long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex;
+ if (dataFile.getFilePointer() < nextRowStart)
+ {
+ // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table.
+ saveOutOfOrderRow(prevKey,
- SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns),
++ SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), checkData),
+ String.format("Row fragment detected after END_OF_ROW at key %s", prevKey));
+ if (dataFile.isEOF())
+ break;
+ }
+ }
+
long rowStart = dataFile.getFilePointer();
outputHandler.debug("Reading row at " + rowStart);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 17f9a8d,45994d0..8c02ee7
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@@ -20,13 -20,13 +20,16 @@@ package org.apache.cassandra.io.sstable
import java.io.*;
import java.util.Iterator;
+ import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+ import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.Descriptor.Version;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.serializers.MarshalException;
@@@ -60,13 -62,43 +63,42 @@@
* @param sstable SSTable we are reading ffrom.
* @param file Reading using this file.
* @param key Key of this row.
- * @param dataSize length of row data
* @param checkData if true, do its best to deserialize and check the coherence of row data
*/
- public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+ public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, boolean checkData)
{
- this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
+ this(sstable.metadata, file, file.getPath(), key, checkData, sstable, ColumnSerializer.Flag.LOCAL);
}
+ /**
+ * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker.
+ */
+ public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
+ {
+ final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+ final CellNameType type = sstable.metadata.comparator;
+ final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+ final Version version = sstable.descriptor.version;
+ final long dataEnd = file.getFilePointer() + dataSize;
- return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
++ return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, checkData, sstable, flag, DeletionTime.LIVE,
+ new AbstractIterator<OnDiskAtom>()
+ {
+ protected OnDiskAtom computeNext()
+ {
+ if (file.getFilePointer() >= dataEnd)
+ return endOfData();
+ try
+ {
+ return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ });
+ }
+
// sstable may be null *if* checkData is false
// If it is null, we assume the data is in the current file format
private SSTableIdentityIterator(CFMetaData metadata,
@@@ -77,22 -110,15 +109,16 @@@
SSTableReader sstable,
ColumnSerializer.Flag flag)
{
- assert !checkData || (sstable != null);
- this.in = in;
- this.filename = filename;
- this.key = key;
- this.flag = flag;
- this.validateColumns = checkData;
- this.sstable = sstable;
-
- Version dataVersion = sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version;
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
- columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
- this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
- metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version));
++ this(metadata, in, filename, key, checkData, sstable, flag, readDeletionTime(in, sstable, filename),
++ metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000),
++ sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version));
+ }
+ private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename)
+ {
try
{
- columnFamily.delete(DeletionTime.serializer.deserialize(in));
- atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+ return DeletionTime.serializer.deserialize(in);
}
catch (IOException e)
{
@@@ -102,6 -128,32 +128,30 @@@
}
}
+ // sstable may be null *if* checkData is false
+ // If it is null, we assume the data is in the current file format
+ private SSTableIdentityIterator(CFMetaData metadata,
+ DataInput in,
+ String filename,
+ DecoratedKey key,
- long dataSize,
+ boolean checkData,
+ SSTableReader sstable,
+ ColumnSerializer.Flag flag,
+ DeletionTime deletion,
+ Iterator<OnDiskAtom> atomIterator)
+ {
+ assert !checkData || (sstable != null);
+ this.in = in;
+ this.filename = filename;
+ this.key = key;
- this.dataSize = dataSize;
+ this.flag = flag;
+ this.validateColumns = checkData;
+ this.sstable = sstable;
+ columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+ columnFamily.delete(deletion);
+ this.atomIterator = atomIterator;
+ }
+
public DecoratedKey getKey()
{
return key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index d064e69,0000000..505bac0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,587 -1,0 +1,588 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+import org.apache.cassandra.utils.SyncUtil;
+
+public class BigTableWriter extends SSTableWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
+
+ // not very random, but the only value that can't be mistaken for a legal column-name length
+ public static final int END_OF_ROW = 0x0000;
+
+ private final IndexWriter iwriter;
+ private SegmentedFile.Builder dbuilder;
+ private final SequentialWriter dataFile;
+ private DecoratedKey lastWrittenKey;
+ private FileMark dataMark;
+
+ BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+
+ if (compression)
+ {
+ dataFile = SequentialWriter.open(getFilename(),
+ descriptor.filenameFor(Component.COMPRESSION_INFO),
+ metadata.compressionParameters(),
+ metadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
+ }
+ else
+ {
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
+ dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
+ }
+ iwriter = new IndexWriter(keyCount, dataFile);
+ }
+
+ public void mark()
+ {
+ dataMark = dataFile.mark();
+ iwriter.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ dataFile.resetAndTruncate(dataMark);
+ iwriter.resetAndTruncate();
+ }
+
+ /**
+ * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
+ */
+ private long beforeAppend(DecoratedKey decoratedKey)
+ {
+ assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
+ throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
+ return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
+ }
+
+ private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException
+ {
+ metadataCollector.addKey(decoratedKey.getKey());
+ lastWrittenKey = decoratedKey;
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote {} at {}", decoratedKey, dataEnd);
+ iwriter.append(decoratedKey, index, dataEnd);
+ dbuilder.addPotentialBoundary(dataEnd);
+ }
+
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ long startPosition = beforeAppend(row.key);
+ RowIndexEntry entry;
+ try
+ {
+ entry = row.write(startPosition, dataFile);
+ if (entry == null)
+ return null;
+ long endPosition = dataFile.getFilePointer();
+ long rowSize = endPosition - startPosition;
+ maybeLogLargePartitionWarning(row.key, rowSize);
+ metadataCollector.update(rowSize, row.columnStats());
+ afterAppend(row.key, endPosition, entry);
+ return entry;
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ }
+
+ public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+ {
+ if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
+ decoratedKey.getKey().remaining(),
+ FBUtilities.MAX_UNSIGNED_SHORT);
+ return;
+ }
+
+ long startPosition = beforeAppend(decoratedKey);
+ long endPosition;
+ try
+ {
+ RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+ endPosition = dataFile.getFilePointer();
+ afterAppend(decoratedKey, endPosition, entry);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ long rowSize = endPosition - startPosition;
+ maybeLogLargePartitionWarning(decoratedKey, rowSize);
+ metadataCollector.update(endPosition - startPosition, cf.getColumnStats());
+ }
+
+ private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
+ {
+ if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
+ {
+ String keyString = metadata.getKeyValidator().getString(key.getKey());
+ logger.warn("Writing large partition {}/{}:{} ({} bytes)", metadata.ksName, metadata.cfName, keyString, rowSize);
+ }
+ }
+
+ private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
+ {
+ assert cf.hasColumns() || cf.isMarkedForDelete();
+
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
+ ColumnIndex index = builder.build(cf);
+
+ out.writeShort(END_OF_ROW);
+ return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+ }
+
+ /**
+ * @throws IOException if a read from the DataInput fails
+ * @throws FSWriteError if a write to the dataFile fails
+ */
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+
+ ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
+ ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
+ ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
+ List<ByteBuffer> minColumnNames = Collections.emptyList();
+ List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
+ boolean hasLegacyCounterShards = false;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ cf.delete(DeletionTime.serializer.deserialize(in));
+
+ ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+
+ if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
+ {
+ tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ }
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
+ while (rangeTombstoneIterator.hasNext())
+ {
+ RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
+ tombstones.update(rangeTombstone.getLocalDeletionTime());
+ minTimestampTracker.update(rangeTombstone.timestamp());
+ maxTimestampTracker.update(rangeTombstone.timestamp());
+ maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
+ }
+
+ Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
+ try
+ {
+ while (iter.hasNext())
+ {
+ OnDiskAtom atom = iter.next();
+ if (atom == null)
+ break;
+
+ if (atom instanceof CounterCell)
+ {
+ atom = ((CounterCell) atom).markLocalToBeCleared();
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+ }
+
+ int deletionTime = atom.getLocalDeletionTime();
+ if (deletionTime < Integer.MAX_VALUE)
+ tombstones.update(deletionTime);
+ minTimestampTracker.update(atom.timestamp());
+ maxTimestampTracker.update(atom.timestamp());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
+ maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
+
+ columnIndexer.add(atom); // This write the atom on disk too
+ }
++ columnIndexer.finishAddingAtoms();
+
+ columnIndexer.maybeWriteEmptyRowHeader();
+ dataFile.stream.writeShort(END_OF_ROW);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+
+ metadataCollector.updateMinTimestamp(minTimestampTracker.get())
+ .updateMaxTimestamp(maxTimestampTracker.get())
+ .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
+ .addRowSize(dataFile.getFilePointer() - currentPosition)
+ .addColumnCount(columnIndexer.writtenAtomCount())
+ .mergeTombstoneHistogram(tombstones)
+ .updateMinColumnNames(minColumnNames)
+ .updateMaxColumnNames(maxColumnNames)
+ .updateHasLegacyCounterShards(hasLegacyCounterShards);
+
+ afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
+ return currentPosition;
+ }
+
+ private Descriptor makeTmpLinks()
+ {
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+ return link;
+ }
+
+ @SuppressWarnings("resource")
+ public SSTableReader openEarly()
+ {
+ // find the max (exclusive) readable key
+ IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
+ if (boundary == null)
+ return null;
+
+ StatsMetadata stats = statsMetadata();
+ assert boundary.indexLength > 0 && boundary.dataLength > 0;
+ Descriptor link = makeTmpLinks();
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, boundary),
+ iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(boundary.lastKey);
+ return sstable;
+ }
+
+ public SSTableReader openFinalEarly()
+ {
+ // we must ensure the data is completely flushed to disk
+ dataFile.sync();
+ iwriter.indexFile.sync();
+ return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
+ }
+
+ @SuppressWarnings("resource")
+ private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
+ {
+ if (maxDataAge < 0)
+ maxDataAge = System.currentTimeMillis();
+
+ StatsMetadata stats = statsMetadata();
+ // finalize in-memory state for the reader
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA));
+ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
+ components,
+ this.metadata,
+ partitioner,
+ ifile,
+ dfile,
+ iwriter.summary.build(partitioner),
+ iwriter.bf.sharedCopy(),
+ maxDataAge,
+ stats,
+ openReason);
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(last);
+ return sstable;
+ }
+
+ protected SSTableWriter.TransactionalProxy txnProxy()
+ {
+ return new TransactionalProxy();
+ }
+
+ class TransactionalProxy extends SSTableWriter.TransactionalProxy
+ {
+ // finalise our state on disk, including renaming
+ protected void doPrepare()
+ {
+ iwriter.prepareToCommit();
+
+ // write sstable statistics
+ dataFile.setDescriptor(descriptor).prepareToCommit();
+ writeMetadata(descriptor, finalizeMetadata());
+
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+
+ // rename to final
+ rename(descriptor, components);
+
+ if (openResult)
+ finalReader = openFinal(descriptor.asType(Descriptor.Type.FINAL), SSTableReader.OpenReason.NORMAL);
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ accumulate = dataFile.commit(accumulate);
+ accumulate = iwriter.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
+ {
+ accumulate = dbuilder.close(accumulate);
+ return accumulate;
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ accumulate = iwriter.abort(accumulate);
+ accumulate = dataFile.abort(accumulate);
+
+ accumulate = delete(descriptor, accumulate);
+ if (!openResult)
+ accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), accumulate);
+ return accumulate;
+ }
+
+ private Throwable delete(Descriptor desc, Throwable accumulate)
+ {
+ try
+ {
+ Set<Component> components = SSTable.discoverComponentsFor(desc);
+ if (!components.isEmpty())
+ SSTable.delete(desc, components);
+ }
+ catch (Throwable t)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), t);
+ accumulate = merge(accumulate, t);
+ }
+ return accumulate;
+ }
+ }
+
+ private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
+ {
+ File file = new File(desc.filenameFor(Component.STATS));
+ try (SequentialWriter out = SequentialWriter.open(file);)
+ {
+ desc.getMetadataSerializer().serialize(components, out.stream);
+ out.setDescriptor(desc).finish();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, file.getPath());
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public long getOnDiskFilePointer()
+ {
+ return dataFile.getOnDiskFilePointer();
+ }
+
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+ class IndexWriter extends AbstractTransactional implements Transactional
+ {
+ private final SequentialWriter indexFile;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummaryBuilder summary;
+ public final IFilter bf;
+ private FileMark mark;
+
+ IndexWriter(long keyCount, final SequentialWriter dataFile)
+ {
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+ summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
+ bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+ // register listeners to be alerted when the data files are flushed
+ indexFile.setPostFlushListener(new Runnable()
+ {
+ public void run()
+ {
+ summary.markIndexSynced(indexFile.getLastFlushOffset());
+ }
+ });
+ dataFile.setPostFlushListener(new Runnable()
+ {
+ public void run()
+ {
+ summary.markDataSynced(dataFile.getLastFlushOffset());
+ }
+ });
+ }
+
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ IndexSummaryBuilder.ReadableBoundary getMaxReadable()
+ {
+ return summary.getLastReadableBoundary();
+ }
+
+ public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException
+ {
+ bf.add(key);
+ long indexStart = indexFile.getFilePointer();
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, indexFile.getPath());
+ }
+ long indexEnd = indexFile.getFilePointer();
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
+
+ summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
+ builder.addPotentialBoundary(indexStart);
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ void flushBf()
+ {
+ if (components.contains(Component.FILTER))
+ {
+ String path = descriptor.filenameFor(Component.FILTER);
+ try (FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos))
+ {
+ // bloom filter
+ FilterFactory.serialize(bf, stream);
+ stream.flush();
+ SyncUtil.sync(fos);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
+ }
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.resetAndTruncate(mark);
+ }
+
+ protected void doPrepare()
+ {
+ flushBf();
+
+ // truncate index file
+ long position = iwriter.indexFile.getFilePointer();
+ iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
+ FileUtils.truncate(iwriter.indexFile.getPath(), position);
+
+ // save summary
+ summary.prepareToCommit();
+ try (IndexSummary summary = iwriter.summary.build(partitioner))
+ {
+ SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary);
+ }
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return indexFile.commit(accumulate);
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return indexFile.abort(accumulate);
+ }
+
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
+ {
+ accumulate = summary.close(accumulate);
+ accumulate = bf.close(accumulate);
+ accumulate = builder.close(accumulate);
+ return accumulate;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --cc test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
index 0000000,22cfa6a..190922a
mode 000000,100644..100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 0d90354,167671b..b69a1f8
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -20,98 -20,68 +20,103 @@@ package org.apache.cassandra.db
*
*/
-import java.io.*;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.UUIDGen;
+
import org.apache.commons.lang3.StringUtils;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.Util;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
+
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.column;
-import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
-public class ScrubTest extends SchemaLoader
+public class ScrubTest
{
- public String KEYSPACE = "Keyspace1";
- public String CF = "Standard1";
- public String CF3 = "Standard2";
- public String COUNTER_CF = "Counter1";
- private static Integer COMPRESSION_CHUNK_LENGTH = 4096;
+ public static final String KEYSPACE = "Keyspace1";
+ public static final String CF = "Standard1";
+ public static final String CF2 = "Standard2";
+ public static final String CF3 = "Standard3";
++ public static final String CFI1 = "StandardInteger1";
+ public static final String COUNTER_CF = "Counter1";
+ public static final String CF_UUID = "UUIDKeys";
+ public static final String CF_INDEX1 = "Indexed1";
+ public static final String CF_INDEX2 = "Indexed2";
+
+ public static final String COL_KEYS_INDEX = "birthdate";
+ public static final String COL_COMPOSITES_INDEX = "col1";
+ public static final String COL_NON_INDEX = "notanindexcol";
+
+ public static final Integer COMPRESSION_CHUNK_LENGTH = 4096;
@BeforeClass
- public static void loadSchema() throws ConfigurationException
+ public static void defineSchema() throws ConfigurationException
{
- loadSchema(COMPRESSION_CHUNK_LENGTH);
+ SchemaLoader.loadSchema();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF),
+ SchemaLoader.standardCFMD(KEYSPACE, CF2),
+ SchemaLoader.standardCFMD(KEYSPACE, CF3),
++ SchemaLoader.standardCFMD(KEYSPACE, CFI1),
+ SchemaLoader.standardCFMD(KEYSPACE, COUNTER_CF)
+ .defaultValidator(CounterColumnType.instance)
+ .compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance),
+ SchemaLoader.indexCFMD(KEYSPACE, CF_INDEX1, true),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true));
}
@Test
@@@ -422,6 -364,59 +427,61 @@@
assert rows.size() == 6 : "Got " + rows.size();
}
+ @Test
+ public void testScrub10791() throws Exception
+ {
+ // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled.
+ CompactionManager.instance.disableAutoCompaction();
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
- String columnFamily = "StandardInteger1";
++ String columnFamily = CFI1;
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ String root = System.getProperty("corrupt-sstable-root");
+ assert root != null;
+ File rootDir = new File(root);
+ assert rootDir.isDirectory();
- Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
- CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
++ Descriptor desc = new Descriptor("ka", rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY);
+
+ // open without validation for scrubbing
+ Set<Component> components = new HashSet<>();
+ components.add(Component.DATA);
+ components.add(Component.PRIMARY_INDEX);
+ components.add(Component.FILTER);
+ components.add(Component.STATS);
+ components.add(Component.SUMMARY);
+ components.add(Component.TOC);
- SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
++ SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs);
+
- Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
- scrubber.scrub();
++ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
++ Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
++ {
++ scrubber.scrub();
++ }
+
+ cfs.loadNewSSTables();
+ assertEquals(7, countCells(cfs));
+ }
+
+ private int countCells(ColumnFamilyStore cfs)
+ {
+ int cellCount = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+ while (it.hasNext())
+ {
+ Iterator<OnDiskAtom> itr = it.next();
+ while (itr.hasNext())
+ {
+ ++cellCount;
+ itr.next();
+ }
+ }
+ }
+ return cellCount;
+ }
+
private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
{
boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 3c799e2,31dc492..e751968
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -26,7 -26,7 +26,8 @@@ import java.util.concurrent.TimeUnit
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@@ -306,9 -280,21 +312,21 @@@ public class StreamingTransferTes
// add RangeTombstones
cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+ cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.apply();
++ rm.applyUnsafe();
+
+ key = "key1";
+ rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+ // add columns of size slightly less than column_index_size to force insert column index
+ rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+ cf = rm.addOrGet(cfname);
+ // add RangeTombstones
+ cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
- rm.apply();
+ rm.applyUnsafe();
+
cfs.forceBlockingFlush();
+ int cellCount = countCells(cfs);
SSTableReader sstable = cfs.getSSTables().iterator().next();
cfs.clearUnsafe();
transferSSTables(sstable);
[4/5] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b97fa110
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b97fa110
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b97fa110
Branch: refs/heads/trunk
Commit: b97fa1101e2ad8d4517117113ac2a40a5387e289
Parents: 38eaf7f bb6973a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 15:09:19 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 15:09:19 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------