You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:29 UTC
[01/10] git commit: update COLD_READS_TO_OMIT_KEY
Updated Branches:
refs/heads/cassandra-2.0 7187a8af0 -> d4b5b0dbc
refs/heads/trunk ba3c1bcb9 -> d39ec221b
update COLD_READS_TO_OMIT_KEY
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3a75035
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3a75035
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3a75035
Branch: refs/heads/trunk
Commit: f3a75035ac105d032988d8b26e562458ac469699
Parents: 9a94494
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:39:30 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500
----------------------------------------------------------------------
.../compaction/SizeTieredCompactionStrategyOptions.java | 10 +++++-----
.../db/compaction/SizeTieredCompactionStrategyTest.java | 12 ++++++------
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3a75035/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index 711ec6e..c6c5f1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -30,7 +30,7 @@ public final class SizeTieredCompactionStrategyOptions
protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
protected static final String BUCKET_LOW_KEY = "bucket_low";
protected static final String BUCKET_HIGH_KEY = "bucket_high";
- protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
+ protected static final String COLD_READS_TO_OMIT_KEY = "cold_reads_to_omit";
protected long minSSTableSize;
protected double bucketLow;
@@ -45,7 +45,7 @@ public final class SizeTieredCompactionStrategyOptions
bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
optionValue = options.get(BUCKET_HIGH_KEY);
bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
- optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+ optionValue = options.get(COLD_READS_TO_OMIT_KEY);
coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
}
@@ -94,17 +94,17 @@ public final class SizeTieredCompactionStrategyOptions
BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
}
- double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+ double maxColdReadsRatio = parseDouble(options, COLD_READS_TO_OMIT_KEY, DEFAULT_COLD_READS_TO_OMIT);
if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
{
throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
- MAX_COLD_READS_RATIO_KEY, optionValue));
+ COLD_READS_TO_OMIT_KEY, optionValue));
}
uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
uncheckedOptions.remove(BUCKET_LOW_KEY);
uncheckedOptions.remove(BUCKET_HIGH_KEY);
- uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
+ uncheckedOptions.remove(COLD_READS_TO_OMIT_KEY);
return uncheckedOptions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3a75035/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 5e79bd8..6bfa4e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -49,7 +49,7 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
public void testOptionsValidation() throws ConfigurationException
{
Map<String, String> options = new HashMap<>();
- options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+ options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.35");
options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
@@ -58,21 +58,21 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
try
{
- options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+ options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "-0.5");
validateOptions(options);
- fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
}
catch (ConfigurationException e) {}
try
{
- options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+ options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "10.0");
validateOptions(options);
- fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
}
catch (ConfigurationException e)
{
- options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+ options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.25");
}
try
[04/10] git commit: Remove CHANGES.txt dups
Posted by jb...@apache.org.
Remove CHANGES.txt dups
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a38dd52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a38dd52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a38dd52
Branch: refs/heads/trunk
Commit: 0a38dd52f284ee75b19850fc98c2e77df6141817
Parents: f3a7503
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 30 00:16:18 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 30 00:16:58 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a38dd52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4815c1c..e73a2b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -81,8 +81,6 @@ Merged from 1.2:
* Rework token replacement to use replace_address (CASSANDRA-5916)
* Fix altering column types (CASSANDRA-6185)
* cqlsh: fix CREATE/ALTER WITH completion (CASSANDRA-6196)
- * Fix altering column types (CASSANDRA-6185)
- * cqlsh: fix CREATE/ALTER WITH completion (CASSANDRA-6196)
* add windows bat files for shell commands (CASSANDRA-6145)
* Fix potential stack overflow during range tombstones insertion (CASSANDRA-6181)
* (Hadoop) Make LOCAL_ONE the default consistency level (CASSANDRA-6214)
[07/10] git commit: Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Posted by jb...@apache.org.
Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db
Branch: refs/heads/trunk
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 22 ++++----
.../db/compaction/AbstractCompactedRow.java | 4 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/LazilyCompactedRow.java | 34 ++++++++----
.../cassandra/db/compaction/Scrubber.java | 52 ++++++++++++-------
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes
8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
* Compact hottest sstables first and optionally omit coldest from
compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
}
ColumnIndex index = build();
- finish();
+ maybeWriteEmptyRowHeader();
return index;
}
- public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+ /**
+ * The important distinction wrt build() is that we may be building for a row that ends up
+ * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+ * columns shadowed by expired tombstone). Thus, it is the caller's responsibility
+ * to decide whether to write the header for an empty row.
+ */
+ public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
{
- for (OnDiskAtom c : columns)
- add(c);
- ColumnIndex index = build();
-
- finish();
-
- return index;
+ while (columns.hasNext())
+ add(columns.next());
+ return build();
}
public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
return result;
}
- public void finish() throws IOException
+ public void maybeWriteEmptyRowHeader() throws IOException
{
if (!deletionInfo.isLive())
maybeWriteRowHeader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
/**
* write the row (size + column index + filter + column data, but NOT row key) to @param out.
- * It is an error to call this if isEmpty is false. (Because the key is appended first,
- * so we'd have an incomplete row written.)
*
* write() may change internal state; it is NOT valid to call write() or update() a second time.
+ *
+ * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
*/
public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
/**
* @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * are included in the compaction set
+ * older than @param maxDeletionTimestamp are included in the compaction set
*/
public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private final List<? extends OnDiskAtomIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
- private ColumnFamily emptyColumnFamily;
+ private final ColumnFamily emptyColumnFamily;
private Reducer reducer;
private ColumnStats columnStats;
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxDelTimestamp;
+ private long maxTombstoneTimestamp;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- maxDelTimestamp = Long.MIN_VALUE;
+ ColumnFamily rawCf = null;
+ maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
- maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+ maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
- if (emptyColumnFamily == null)
- emptyColumnFamily = cf;
+ if (rawCf == null)
+ rawCf = cf;
else
- emptyColumnFamily.delete(cf);
+ rawCf.delete(cf);
}
- this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+ // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+ // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+ // until we iterate over them. By passing MAX_VALUE we will only purge if there are
+ // no other versions of this row present.
+ this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+ // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+ // to get rid of redundant row-level and range tombstones
+ assert rawCf != null;
+ int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+ ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+ emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.build(this);
+ columnsIndex = indexBuilder.buildForCompaction(iterator());
if (columnsIndex.columnsIndex.isEmpty())
{
boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+ reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
);
reducer = null;
+ indexBuilder.maybeWriteEmptyRowHeader();
out.writeShort(SSTableWriter.END_OF_ROW);
close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
private final OutputHandler outputHandler;
- private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+ private static final Comparator<Row> rowComparator = new Comparator<Row>()
{
- public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ public int compare(Row r1, Row r2)
{
return r1.key.compareTo(r2.key);
}
};
- private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+ private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
public void scrub()
{
- outputHandler.output("Scrubbing " + sstable);
+ outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
// TODO errors when creating the writer may leave empty temp files.
writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- AbstractCompactedRow prevRow = null;
+ DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
outputHandler.warn("Index file contained a different key or row size; using key from data file");
}
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
key = sstable.partitioner.decorateKey(currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
}
catch (Throwable th2)
{
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- for (AbstractCompactedRow row : outOfOrderRows)
- inOrderWriter.append(row);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
}
}
+ private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+ {
+ // TODO bitch if the row is too large? if it is there's not much we can do ...
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+ // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+ ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+ while (atoms.hasNext())
+ {
+ OnDiskAtom atom = atoms.next();
+ cf.addAtom(atom);
+ }
+ outOfOrderRows.add(new Row(key, cf));
+ }
+
public SSTableReader getNewSSTable()
{
return newSstable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
- columnIndexer.finish();
+ columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ
[10/10] git commit: merge from 2.0
Posted by jb...@apache.org.
merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d39ec221
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d39ec221
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d39ec221
Branch: refs/heads/trunk
Commit: d39ec221be61b59805eb27d5e767f1958678ac3f
Parents: f1d5b5d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:18:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:18:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d39ec221/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cdbbe1..5a2e56b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
* Compact hottest sstables first and optionally omit coldest from
compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
[05/10] git commit: Make MS.getBoundTerms() correct for batched
statements
Posted by jb...@apache.org.
Make MS.getBoundTerms() correct for batched statements
patch by Aleksey Yeschenko and Sam Tunnicliffe; reviewed by
Sylvain Lebresne
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7187a8af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7187a8af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7187a8af
Branch: refs/heads/trunk
Commit: 7187a8af05b53d5318f3e4fa47122b232c4c7e52
Parents: 0a38dd5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 30 13:20:47 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 30 13:20:47 2013 +0300
----------------------------------------------------------------------
.../apache/cassandra/cql3/VariableSpecifications.java | 7 +++++++
.../cassandra/cql3/statements/DeleteStatement.java | 6 +++---
.../cassandra/cql3/statements/ModificationStatement.java | 11 ++++++++---
.../cassandra/cql3/statements/UpdateStatement.java | 8 ++++----
4 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index ecdba6f..297999a 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -24,6 +24,7 @@ public class VariableSpecifications
{
private final List<ColumnIdentifier> variableNames;
private final ColumnSpecification[] specs;
+ private int collectedCount;
public VariableSpecifications(List<ColumnIdentifier> variableNames)
{
@@ -48,5 +49,11 @@ public class VariableSpecifications
if (name != null)
spec = new ColumnSpecification(spec.ksName, spec.cfName, name, spec.type);
specs[bindIndex] = spec;
+ collectedCount++;
+ }
+
+ public int getCollectedCount()
+ {
+ return collectedCount;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 3704e14..4852cf7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.utils.Pair;
*/
public class DeleteStatement extends ModificationStatement
{
- private DeleteStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+ private DeleteStatement(CFMetaData cfm, Attributes attrs)
{
- super(boundTerms, cfm, attrs);
+ super(cfm, attrs);
}
public boolean requireFullClusteringKey()
@@ -105,7 +105,7 @@ public class DeleteStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- DeleteStatement stmt = new DeleteStatement(boundNames.size(), cfDef.cfm, attrs);
+ DeleteStatement stmt = new DeleteStatement(cfDef.cfm, attrs);
for (Operation.RawDeletion deletion : deletions)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0f425b8..7aebc48 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -46,19 +46,18 @@ public abstract class ModificationStatement implements CQLStatement
{
private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
- private final int boundTerms;
public final CFMetaData cfm;
public final Attributes attrs;
private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>();
private final List<Operation> columnOperations = new ArrayList<Operation>();
+ private int boundTerms;
private List<Operation> columnConditions;
private boolean ifNotExists;
- public ModificationStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+ public ModificationStatement(CFMetaData cfm, Attributes attrs)
{
- this.boundTerms = boundTerms;
this.cfm = cfm;
this.attrs = attrs;
}
@@ -579,6 +578,10 @@ public abstract class ModificationStatement implements CQLStatement
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
CFDefinition cfDef = metadata.getCfDef();
+ // The collected count in the beginning of preparation.
+ // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones).
+ int collected = boundNames.getCollectedCount();
+
Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
preparedAttributes.collectMarkerSpecification(boundNames);
@@ -634,6 +637,8 @@ public abstract class ModificationStatement implements CQLStatement
}
}
}
+
+ stmt.boundTerms = boundNames.getCollectedCount() - collected;
return stmt;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 12348df..a387962 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -35,9 +35,9 @@ public class UpdateStatement extends ModificationStatement
{
private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
- private UpdateStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+ private UpdateStatement(CFMetaData cfm, Attributes attrs)
{
- super(boundTerms, cfm, attrs);
+ super(cfm, attrs);
}
public boolean requireFullClusteringKey()
@@ -131,7 +131,7 @@ public class UpdateStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+ UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
// Created from an INSERT
if (stmt.isCounter())
@@ -201,7 +201,7 @@ public class UpdateStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+ UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
{
[09/10] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1d5b5d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1d5b5d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1d5b5d2
Branch: refs/heads/trunk
Commit: f1d5b5d2af559705583fe52cdbc1c95d8af79862
Parents: 36cdf34 d4b5b0d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:18:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:18:15 2013 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[03/10] git commit: Compact hottest sstables first and optionally
omit coldest patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109
Posted by jb...@apache.org.
Compact hottest sstables first and optionally omit coldest
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37285304
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37285304
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37285304
Branch: refs/heads/trunk
Commit: 37285304ee484122410c977399024f2af132753c
Parents: 5eddf18
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:25:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../SizeTieredCompactionStrategy.java | 149 +++++++++++---
.../SizeTieredCompactionStrategyOptions.java | 53 ++---
.../cassandra/io/sstable/SSTableReader.java | 4 +-
.../SizeTieredCompactionStrategyTest.java | 192 ++++++++++++++++++-
5 files changed, 341 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bf7f21..4815c1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.3
+ * Compact hottest sstables first and optionally omit coldest from
+ compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
* cqlsh: fix LIST USERS output (CASSANDRA-6242)
* Add IRequestSink interface (CASSANDRA-6248)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index cee5f97..5115860 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.Map.Entry;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +55,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
int minThreshold = cfs.getMinimumCompactionThreshold();
int maxThreshold = cfs.getMaximumCompactionThreshold();
- Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
- List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), options.bucketHigh, options.bucketLow, options.minSSTableSize);
+ Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+ candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+
+ List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
logger.debug("Compaction buckets are {}", buckets);
updateEstimatedCompactionsByTasks(buckets);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
@@ -77,34 +80,88 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return Collections.singletonList(sstablesWithTombstones.get(0));
}
+ /**
+ * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+ * across all sstables
+ * @param sstables all sstables to consider
+ * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+ * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+ */
+ @VisibleForTesting
+ static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+ {
+ // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+ Collections.sort(sstables, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ int comparison = Double.compare(hotness(o1), hotness(o2));
+ if (comparison != 0)
+ return comparison;
+
+ return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+ }
+ });
+
+ // calculate the total reads/sec across all sstables
+ double totalReads = 0.0;
+ for (SSTableReader sstr : sstables)
+ if (sstr.readMeter != null)
+ totalReads += sstr.readMeter.twoHourRate();
+
+ // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+ if (totalReads == 0.0)
+ return sstables;
+
+ // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+ double maxColdReads = coldReadsToOmit * totalReads;
+
+ double totalColdReads = 0.0;
+ int cutoffIndex = 0;
+ while (cutoffIndex < sstables.size())
+ {
+ double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+ if (totalColdReads + reads > maxColdReads)
+ break;
+
+ totalColdReads += reads;
+ cutoffIndex++;
+ }
+
+ return sstables.subList(cutoffIndex, sstables.size());
+ }
+
+ /**
+ * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+ * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+ * @return a bucket (list) of sstables to compact
+ */
public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
{
- // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
- List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+ // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+ final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
for (List<SSTableReader> bucket : buckets)
{
- if (bucket.size() < minThreshold)
- continue;
-
- Collections.sort(bucket, new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return o1.descriptor.generation - o2.descriptor.generation;
- }
- });
- List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
- prunedBuckets.add(prunedBucket);
+ Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+ if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+ prunedBucketsAndHotness.add(bucketAndHotness);
}
- if (prunedBuckets.isEmpty())
+ if (prunedBucketsAndHotness.isEmpty())
return Collections.emptyList();
- // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
- return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
+ // prefer compacting the hottest bucket
+ Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
{
- public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+ public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
{
- return Longs.compare(avgSize(o1), avgSize(o2));
+ int comparison = Double.compare(o1.right, o2.right);
+ if (comparison != 0)
+ return comparison;
+
+ // break ties by compacting the smallest sstables first (this will probably only happen for
+ // system tables and new/unread sstables)
+ return Long.compare(avgSize(o1.left), avgSize(o2.left));
}
private long avgSize(List<SSTableReader> sstables)
@@ -115,6 +172,44 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return n / sstables.size();
}
});
+
+ return hottest.left;
+ }
+
+ /**
+ * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+ * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+ **/
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+ {
+ // sort by sstable hotness (descending)
+ Collections.sort(bucket, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return -1 * Double.compare(hotness(o1), hotness(o2));
+ }
+ });
+
+ // and then trim the coldest sstables off the end to meet the maxThreshold
+ List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+
+ // bucket hotness is the sum of the hotness of all sstable members
+ double bucketHotness = 0.0;
+ for (SSTableReader sstr : prunedBucket)
+ bucketHotness += hotness(sstr);
+
+ return Pair.create(prunedBucket, bucketHotness);
+ }
+
+ /**
+ * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+ */
+ private static double hotness(SSTableReader sstr)
+ {
+ // system tables don't have read meters, just use 0.0 for the hotness
+ return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
}
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
@@ -124,13 +219,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
while (true)
{
- List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore);
+ List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
- if (smallestBucket.isEmpty())
+ if (hottestBucket.isEmpty())
return null;
- if (cfs.getDataTracker().markCompacting(smallestBucket))
- return new CompactionTask(cfs, smallestBucket, gcBefore);
+ if (cfs.getDataTracker().markCompacting(hottestBucket))
+ return new CompactionTask(cfs, hottestBucket, gcBefore);
}
}
@@ -253,4 +348,4 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index d7c9075..711ec6e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -26,31 +26,48 @@ public final class SizeTieredCompactionStrategyOptions
protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
protected static final double DEFAULT_BUCKET_LOW = 0.5;
protected static final double DEFAULT_BUCKET_HIGH = 1.5;
+ protected static final double DEFAULT_COLD_READS_TO_OMIT = 0.0;
protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
protected static final String BUCKET_LOW_KEY = "bucket_low";
protected static final String BUCKET_HIGH_KEY = "bucket_high";
+ protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
protected long minSSTableSize;
protected double bucketLow;
protected double bucketHigh;
+ protected double coldReadsToOmit;
public SizeTieredCompactionStrategyOptions(Map<String, String> options)
{
-
String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
minSSTableSize = optionValue == null ? DEFAULT_MIN_SSTABLE_SIZE : Long.parseLong(optionValue);
optionValue = options.get(BUCKET_LOW_KEY);
bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
optionValue = options.get(BUCKET_HIGH_KEY);
bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
+ optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+ coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
}
public SizeTieredCompactionStrategyOptions()
{
-
minSSTableSize = DEFAULT_MIN_SSTABLE_SIZE;
bucketLow = DEFAULT_BUCKET_LOW;
bucketHigh = DEFAULT_BUCKET_HIGH;
+ coldReadsToOmit = DEFAULT_COLD_READS_TO_OMIT;
+ }
+
+ private static double parseDouble(Map<String, String> options, String key, double defaultValue) throws ConfigurationException
+ {
+ String optionValue = options.get(key);
+ try
+ {
+ return optionValue == null ? defaultValue : Double.parseDouble(optionValue);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable float for %s", optionValue, key), e);
+ }
}
public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
@@ -69,36 +86,26 @@ public final class SizeTieredCompactionStrategyOptions
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MIN_SSTABLE_SIZE_KEY), e);
}
- double bucketLow, bucketHigh;
- optionValue = options.get(BUCKET_LOW_KEY);
- try
- {
- bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
- }
- catch (NumberFormatException e)
- {
- throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_LOW), e);
- }
-
- optionValue = options.get(BUCKET_HIGH_KEY);
- try
- {
- bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
- }
- catch (NumberFormatException e)
+ double bucketLow = parseDouble(options, BUCKET_LOW_KEY, DEFAULT_BUCKET_LOW);
+ double bucketHigh = parseDouble(options, BUCKET_HIGH_KEY, DEFAULT_BUCKET_HIGH);
+ if (bucketHigh <= bucketLow)
{
- throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_HIGH), e);
+ throw new ConfigurationException(String.format("%s value (%s) is less than or equal to the %s value (%s)",
+ BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
}
- if (bucketHigh <= bucketLow)
+ double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+ if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
{
- throw new ConfigurationException(String.format("Bucket high value (%s) is less than or equal bucket low value (%s)", bucketHigh, bucketLow));
+ throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
+ MAX_COLD_READS_RATIO_KEY, optionValue));
}
uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
uncheckedOptions.remove(BUCKET_LOW_KEY);
uncheckedOptions.remove(BUCKET_HIGH_KEY);
+ uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
return uncheckedOptions;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 9837f4c..c961d44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
@@ -103,7 +104,8 @@ public class SSTableReader extends SSTable implements Closeable
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
- public final RestorableMeter readMeter;
+ @VisibleForTesting
+ public RestorableMeter readMeter;
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 89604c5..5e79bd8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -17,17 +17,80 @@
*/
package org.apache.cassandra.db.compaction;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.getBuckets;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.mostInterestingBucket;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.trimToThresholdWithHotness;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.filterColdSSTables;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.validateOptions;
-public class SizeTieredCompactionStrategyTest
+import static org.junit.Assert.*;
+
+public class SizeTieredCompactionStrategyTest extends SchemaLoader
{
+
+ @Test
+ public void testOptionsValidation() throws ConfigurationException
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
+ options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
+ Map<String, String> unvalidated = validateOptions(options);
+ assertTrue(unvalidated.isEmpty());
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+ validateOptions(options);
+ fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ }
+ catch (ConfigurationException e) {}
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+ validateOptions(options);
+ fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+ }
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "1000.0");
+ validateOptions(options);
+ fail("bucket_low greater than bucket_high should be rejected");
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+ }
+
+ options.put("bad_option", "1.0");
+ unvalidated = validateOptions(options);
+ assertTrue(unvalidated.containsKey("bad_option"));
+ }
+
@Test
public void testGetBuckets()
{
@@ -39,7 +102,7 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- List<List<String>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+ List<List<String>> buckets = getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(3, buckets.size());
for (List<String> bucket : buckets)
@@ -59,7 +122,7 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+ buckets = getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(2, buckets.size());
for (List<String> bucket : buckets)
@@ -80,7 +143,120 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 10);
+ buckets = getBuckets(pairs, 1.5, 0.5, 10);
assertEquals(1, buckets.size());
}
-}
+
+ @Test
+ public void testPrepBucket() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "Standard1";
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 3 sstables
+ int numSSTables = 3;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+ Pair<List<SSTableReader>, Double> bucket;
+
+ List<SSTableReader> interestingBucket = mostInterestingBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+ assertTrue("nothing should be returned when all buckets are below the min threshold", interestingBucket.isEmpty());
+
+ sstrs.get(0).readMeter = new RestorableMeter(100.0, 100.0);
+ sstrs.get(1).readMeter = new RestorableMeter(200.0, 200.0);
+ sstrs.get(2).readMeter = new RestorableMeter(300.0, 300.0);
+
+ long estimatedKeys = sstrs.get(0).estimatedKeys();
+
+ // if we have more than the max threshold, the coldest should be dropped
+ bucket = trimToThresholdWithHotness(sstrs, 2);
+ assertEquals("one bucket should have been dropped", 2, bucket.left.size());
+ double expectedBucketHotness = (200.0 + 300.0) / estimatedKeys;
+ assertEquals(String.format("bucket hotness (%f) should be close to %f", bucket.right, expectedBucketHotness),
+ expectedBucketHotness, bucket.right, 1.0);
+ }
+
+ @Test
+ public void testFilterColdSSTables() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "Standard1";
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 10 sstables
+ int numSSTables = 10;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> filtered;
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = null;
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
+
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(0.0, 0.0);
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
+
+ // leave all read rates at 0 besides one
+ sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("there should only be one hot sstable", 1, filtered.size());
+ assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
+
+ // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
+ // rate 1.0 should be ignored, but not the third
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(0.0, 0.0);
+ sstrs.get(0).readMeter = new RestorableMeter(97.0, 97.0);
+ sstrs.get(1).readMeter = new RestorableMeter(1.0, 1.0);
+ sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
+ sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
+
+ filtered = filterColdSSTables(sstrs, 0.025);
+ assertEquals(2, filtered.size());
+ assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
+
+ // make sure a threshold of 0.0 doesn't result in any sstables being filtered
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(1.0, 1.0);
+ filtered = filterColdSSTables(sstrs, 0.0);
+ assertEquals(sstrs.size(), filtered.size());
+
+ // just for fun, set a threshold where all sstables are considered cold
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(1.0, 1.0);
+ filtered = filterColdSSTables(sstrs, 1.0);
+ assertTrue(filtered.isEmpty());
+ }
+}
\ No newline at end of file
[08/10] git commit: Remove multithreaded compaction and PCR patch by
jbellis; reviewed by marcuse for CASSANDRA-6142
Posted by jb...@apache.org.
Remove multithreaded compaction and PCR
patch by jbellis; reviewed by marcuse for CASSANDRA-6142
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36cdf34b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36cdf34b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36cdf34b
Branch: refs/heads/trunk
Commit: 36cdf34bd92ede5ad99447e10d90e6caa1fd743a
Parents: ba3c1bc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:17:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:17:23 2013 -0500
----------------------------------------------------------------------
conf/cassandra.yaml | 7 -
.../org/apache/cassandra/config/Config.java | 1 -
.../cassandra/config/DatabaseDescriptor.java | 7 -
.../org/apache/cassandra/db/ColumnIndex.java | 19 +-
.../db/compaction/AbstractCompactedRow.java | 4 +-
.../db/compaction/CompactionController.java | 58 +--
.../db/compaction/CompactionIterable.java | 9 +-
.../db/compaction/CompactionManager.java | 6 +-
.../cassandra/db/compaction/CompactionTask.java | 4 +-
.../db/compaction/LazilyCompactedRow.java | 91 +++--
.../compaction/ParallelCompactionIterable.java | 403 -------------------
.../db/compaction/PrecompactedRow.java | 202 ----------
.../db/compaction/SSTableSplitter.java | 4 +-
.../cassandra/db/compaction/Scrubber.java | 56 ++-
.../cassandra/db/compaction/Upgrader.java | 4 +-
.../io/sstable/IndexSummaryBuilder.java | 2 +-
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes
.../db/compaction/CompactionsPurgeTest.java | 32 +-
.../cassandra/io/LazilyCompactedRowTest.java | 319 ---------------
.../apache/cassandra/repair/ValidatorTest.java | 39 +-
21 files changed, 173 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 455421a..d6b5b7a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -443,13 +443,6 @@ in_memory_compaction_limit_in_mb: 64
# Uncomment to make compaction mono-threaded, the pre-0.8 default.
#concurrent_compactors: 1
-# Multi-threaded compaction. When enabled, each compaction will use
-# up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise,
-# your concern is usually to get compaction to do LESS i/o (see:
-# compaction_throughput_mb_per_sec), not more.
-multithreaded_compaction: false
-
# Throttles compaction to the given total throughput across the entire
# system. The faster you insert data, the faster you need to compact in
# order to keep the sstable count down, but in general, setting this to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8f0f22e..e9f48e6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -109,7 +109,6 @@ public class Config
public Integer in_memory_compaction_limit_in_mb = 64;
public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
public volatile Integer compaction_throughput_mb_per_sec = 16;
- public Boolean multithreaded_compaction = false;
public Integer max_streaming_retries = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 90f1753..1997347 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -382,8 +382,6 @@ public class DatabaseDescriptor
logger.debug("setting auto_bootstrap to {}", conf.auto_bootstrap);
}
- logger.info("{}using multi-threaded compaction", (conf.multithreaded_compaction ? "" : "Not "));
-
if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
{
throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
@@ -856,11 +854,6 @@ public class DatabaseDescriptor
return conf.concurrent_compactors;
}
- public static boolean isMultithreadedCompaction()
- {
- return conf.multithreaded_compaction;
- }
-
public static int getCompactionThroughputMbPerSec()
{
return conf.compaction_throughput_mb_per_sec;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..6dd2028 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,19 +140,26 @@ public class ColumnIndex
}
ColumnIndex index = build();
- finish();
+ maybeWriteEmptyRowHeader();
return index;
}
- public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+ /**
+ * The important distinction wrt build() is that we may be building for a row that ends up
+ * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+ * columns shadowed by expired tombstone). Thus, it is the caller's responsibility
+ * to decide whether to write the header for an empty row.
+ */
+ public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
{
- for (OnDiskAtom c : columns)
+ while (columns.hasNext())
+ {
+ OnDiskAtom c = columns.next();
add(c);
+ }
ColumnIndex index = build();
- finish();
-
return index;
}
@@ -219,7 +226,7 @@ public class ColumnIndex
return result;
}
- public void finish() throws IOException
+ public void maybeWriteEmptyRowHeader() throws IOException
{
if (!deletionInfo.isLive())
maybeWriteRowHeader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
/**
* write the row (size + column index + filter + column data, but NOT row key) to @param out.
- * It is an error to call this if isEmpty is false. (Because the key is appended first,
- * so we'd have an incomplete row written.)
*
* write() may change internal state; it is NOT valid to call write() or update() a second time.
+ *
+ * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
*/
public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..d7c2e4d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -27,7 +27,6 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
@@ -51,9 +50,6 @@ public class CompactionController
public final int gcBefore;
public final int mergeShardBefore;
- /**
- * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
- */
protected CompactionController(ColumnFamilyStore cfs, int maxValue)
{
this(cfs, null, maxValue);
@@ -153,25 +149,24 @@ public class CompactionController
}
/**
- * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * are included in the compaction set
+ * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
+ * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
+ * in other sstables.
*/
- public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
List<SSTableReader> filteredSSTables = overlappingTree.search(key);
+ long min = Long.MAX_VALUE;
for (SSTableReader sstable : filteredSSTables)
{
- if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
- {
- // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
- // we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- return false;
- else if (sstable.getBloomFilter().isPresent(key.key))
- return false;
- }
+ // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
+ // we check index file instead.
+ if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+ min = Math.min(min, sstable.getMinTimestamp());
+ else if (sstable.getBloomFilter().isPresent(key.key))
+ min = Math.min(min, sstable.getMinTimestamp());
}
- return true;
+ return min;
}
public void invalidateCachedRow(DecoratedKey key)
@@ -179,35 +174,6 @@ public class CompactionController
cfs.invalidateCachedRow(key);
}
- /**
- * @return an AbstractCompactedRow implementation to write the merged rows in question.
- *
- * If there is a single source row, the data is from a current-version sstable, we don't
- * need to purge and we aren't forcing deserialization for scrub, write it unchanged.
- * Otherwise, we deserialize, purge tombstones, and reserialize in the latest version.
- */
- public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
- {
- long rowSize = 0;
- for (SSTableIdentityIterator row : rows)
- rowSize += row.dataSize;
-
- if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- String keyString = cfs.metadata.getKeyValidator().getString(rows.get(0).getKey().key);
- logger.info(String.format("Compacting large row %s/%s:%s (%d bytes) incrementally",
- cfs.keyspace.getName(), cfs.name, keyString, rowSize));
- return new LazilyCompactedRow(this, rows);
- }
- return new PrecompactedRow(this, rows);
- }
-
- /** convenience method for single-sstable compactions */
- public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row)
- {
- return getCompactedRow(Collections.singletonList(row));
- }
-
public void close()
{
SSTableReader.releaseReferences(overlappingSSTables);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 866907b..132cf25 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.utils.CloseableIterator;
@@ -28,7 +30,6 @@ import org.apache.cassandra.utils.MergeIterator;
public class CompactionIterable extends AbstractCompactionIterable
{
-
private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>()
{
public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
@@ -54,11 +55,11 @@ public class CompactionIterable extends AbstractCompactionIterable
protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow>
{
- protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+ protected final List<OnDiskAtomIterator> rows = new ArrayList<>();
public void reduce(OnDiskAtomIterator current)
{
- rows.add((SSTableIdentityIterator) current);
+ rows.add(current);
}
protected AbstractCompactedRow getReduced()
@@ -71,7 +72,7 @@ public class CompactionIterable extends AbstractCompactionIterable
// create a new container for rows, since we're going to clear ours for the next one,
// and the AbstractCompactionRow code should be able to assume that the collection it receives
// won't be pulled out from under it.
- return controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
+ return new LazilyCompactedRow(controller, ImmutableList.copyOf(rows));
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b14c313..966e471 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -579,7 +579,7 @@ public class CompactionManager implements CompactionManagerMBean
row = cleanupStrategy.cleanup(row);
if (row == null)
continue;
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
if (writer.append(compactedRow) != null)
totalkeysWritten++;
}
@@ -905,7 +905,7 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
/*
* The main reason we always purge is that including gcable tombstone would mean that the
@@ -918,7 +918,7 @@ public class CompactionManager implements CompactionManagerMBean
* a tombstone that could shadow a column in another sstable, but this is doubly not a concern
* since validation compaction is read-only.
*/
- return true;
+ return Long.MAX_VALUE;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1ea18e9..bb07be8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -123,9 +123,7 @@ public class CompactionTask extends AbstractCompactionTask
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
- ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
- : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..2cb014a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Iterators;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTable;
@@ -47,18 +46,18 @@ import org.apache.cassandra.utils.StreamingHistogram;
* in memory at a time is the bloom filter, the index, and one column from each
* pre-compaction row.
*/
-public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
+public class LazilyCompactedRow extends AbstractCompactedRow
{
private final List<? extends OnDiskAtomIterator> rows;
private final CompactionController controller;
- private final boolean shouldPurge;
- private ColumnFamily emptyColumnFamily;
- private Reducer reducer;
+ private final long maxPurgeableTimestamp;
+ private final ColumnFamily emptyColumnFamily;
private ColumnStats columnStats;
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxDelTimestamp;
+ private final Reducer reducer;
+ private final Iterator<OnDiskAtom> merger;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -67,18 +66,39 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- maxDelTimestamp = Long.MIN_VALUE;
+ ColumnFamily rawCf = null;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
- maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
- if (emptyColumnFamily == null)
- emptyColumnFamily = cf;
+ if (rawCf == null)
+ rawCf = cf;
else
- emptyColumnFamily.delete(cf);
+ rawCf.delete(cf);
}
- this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+ maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
+ // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+ // to get rid of redundant row-level and range tombstones
+ assert rawCf != null;
+ int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
+ ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+ emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+
+ reducer = new Reducer();
+ merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+ }
+
+ public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ {
+ // We should only gc tombstone if shouldPurge == true. But otherwise,
+ // it is still ok to collect column that shadowed by their (deleted)
+ // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
+ ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
+ shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+ controller.cfs.indexManager.updaterFor(key));
+ if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
+ CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
+ return compacted;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,12 +109,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.build(this);
+ columnsIndex = indexBuilder.buildForCompaction(merger);
if (columnsIndex.columnsIndex.isEmpty())
{
- boolean cfIrrelevant = shouldPurge
- ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
- : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+ boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
+ ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+ : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
if (cfIrrelevant)
return null;
}
@@ -104,17 +124,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
throw new RuntimeException(e);
}
// reach into the reducer (created during iteration) to get column count, size, max column timestamp
- // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
- columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
- reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
- reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
- reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
+ columnStats = new ColumnStats(reducer.columns,
+ reducer.minTimestampSeen,
+ Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
+ reducer.maxLocalDeletionTimeSeen,
+ reducer.tombstones,
+ reducer.minColumnNameSeen,
+ reducer.maxColumnNameSeen
);
- reducer = null;
+ indexBuilder.maybeWriteEmptyRowHeader();
out.writeShort(SSTableWriter.END_OF_ROW);
close();
@@ -142,24 +161,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- Iterator<OnDiskAtom> iter = iterator();
- while (iter.hasNext())
- iter.next().updateDigest(digest);
+ while (merger.hasNext())
+ merger.next().updateDigest(digest);
close();
}
- public AbstractType<?> getComparator()
- {
- return emptyColumnFamily.getComparator();
- }
-
- public Iterator<OnDiskAtom> iterator()
- {
- reducer = new Reducer();
- Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
- return Iterators.filter(iter, Predicates.notNull());
- }
-
public ColumnStats columnStats()
{
return columnStats;
@@ -239,7 +245,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
else
{
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+ boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+ ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
@@ -248,7 +255,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
Column reduced = purged.iterator().next();
container.clear();
- // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
+ // removeDeletedAndOldShards have only checked the top-level CF deletion times,
// not the range tombstone. For that we use the columnIndexer tombstone tracker.
if (indexBuilder.tombstoneTracker().isDeleted(reduced))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
deleted file mode 100644
index 8a74fea..0000000
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.*;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.utils.*;
-
-/**
- * A class to run compaction taking advantage of multiple-core processes:
- *
- * One Deserializer thread per input sstable performs read + deserialize (a row at a time).
- * The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
- *
- * The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns AsyncPrecompactedRow objects.
- *
- * The main complication is in handling larger-than-memory rows. When one is encountered, no further deserialization
- * is done until that row is merged and written -- creating a pipeline stall, as it were. Thus, this is intended
- * to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.
- */
-public class ParallelCompactionIterable extends AbstractCompactionIterable
-{
- private static final Logger logger = LoggerFactory.getLogger(ParallelCompactionIterable.class);
-
- private final int maxInMemorySize;
-
- public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
- {
- this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
- }
-
- public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize)
- {
- super(controller, type, scanners);
- this.maxInMemorySize = maxInMemorySize;
- }
-
- public CloseableIterator<AbstractCompactedRow> iterator()
- {
- List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size());
- for (ICompactionScanner scanner : scanners)
- sources.add(new Deserializer(scanner, maxInMemorySize));
- return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()));
- }
-
- private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
- {
- private final CloseableIterator<CompactedRowContainer> reducer;
-
- public Unwrapper(CloseableIterator<CompactedRowContainer> reducer)
- {
- this.reducer = reducer;
- }
-
- protected AbstractCompactedRow computeNext()
- {
- if (!reducer.hasNext())
- return endOfData();
-
- CompactedRowContainer container = reducer.next();
- AbstractCompactedRow compactedRow;
- compactedRow = container.future == null
- ? container.row
- : new PrecompactedRow(container.key, FBUtilities.waitOnFuture(container.future));
-
- return compactedRow;
- }
-
- public void close() throws IOException
- {
- reducer.close();
- }
- }
-
- private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
- {
- private final List<RowContainer> rows = new ArrayList<RowContainer>();
-
- private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
- Integer.MAX_VALUE,
- TimeUnit.MILLISECONDS,
- new SynchronousQueue<Runnable>(),
- new NamedThreadFactory("CompactionReducer"));
-
- public void reduce(RowContainer current)
- {
- rows.add(current);
- }
-
- protected CompactedRowContainer getReduced()
- {
- assert rows.size() > 0;
-
- ParallelCompactionIterable.this.updateCounterFor(rows.size());
- CompactedRowContainer compacted = getCompactedRow(rows);
- rows.clear();
- long n = 0;
- for (ICompactionScanner scanner : scanners)
- n += scanner.getCurrentPosition();
- bytesRead = n;
- return compacted;
- }
-
- public CompactedRowContainer getCompactedRow(List<RowContainer> rows)
- {
- boolean inMemory = true;
- for (RowContainer container : rows)
- {
- if (container.row == null)
- {
- inMemory = false;
- break;
- }
- }
-
- if (inMemory)
- {
- // caller will re-use rows List, so make ourselves a copy
- List<Row> rawRows = new ArrayList<Row>(rows.size());
- for (RowContainer rowContainer : rows)
- rawRows.add(rowContainer.row);
- return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
- }
-
- List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
- for (RowContainer container : rows)
- iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
- return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
- }
-
- public void close()
- {
- executor.shutdown();
- }
-
- /**
- * Merges a set of in-memory rows
- */
- private class MergeTask implements Callable<ColumnFamily>
- {
- private final List<Row> rows;
-
- public MergeTask(List<Row> rows)
- {
- this.rows = rows;
- }
-
- public ColumnFamily call() throws Exception
- {
- final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
- List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size());
- for (Row row : rows)
- {
- returnCF.delete(row.cf);
- data.add(FBUtilities.closeableIterator(row.cf.iterator()));
- }
-
- PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
- return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
- }
- }
-
- private class DeserializedColumnIterator implements OnDiskAtomIterator
- {
- private final Row row;
- private final Iterator<Column> iter;
-
- public DeserializedColumnIterator(Row row)
- {
- this.row = row;
- iter = row.cf.iterator();
- }
-
- public ColumnFamily getColumnFamily()
- {
- return row.cf;
- }
-
- public DecoratedKey getKey()
- {
- return row.key;
- }
-
- public void close() throws IOException {}
-
- public boolean hasNext()
- {
- return iter.hasNext();
- }
-
- public OnDiskAtom next()
- {
- return iter.next();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
- }
-
- private static class Deserializer extends AbstractIterator<RowContainer> implements CloseableIterator<RowContainer>
- {
- private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
- private static final RowContainer finished = new RowContainer((Row) null);
- private final ICompactionScanner scanner;
-
- public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
- {
- this.scanner = ssts;
- Runnable runnable = new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- SimpleCondition condition = null;
- while (true)
- {
- if (condition != null)
- {
- condition.await();
- condition = null;
- }
- if (!scanner.hasNext())
- {
- queue.put(finished);
- break;
- }
-
- SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
- if (iter.dataSize > maxInMemorySize)
- {
- logger.debug("parallel lazy deserialize from {}", iter.getPath());
- condition = new SimpleCondition();
- queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
- }
- else
- {
- logger.debug("parallel eager deserialize from {}", iter.getPath());
- queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory))));
- }
- }
- }
- };
- new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start();
- }
-
- protected RowContainer computeNext()
- {
- RowContainer container;
- try
- {
- container = queue.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- return container == finished ? endOfData() : container;
- }
-
- public void close() throws IOException
- {
- scanner.close();
- }
- }
-
- /**
- * a wrapper around SSTII that notifies the given condition when it is closed
- */
- private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
- {
- private final SSTableIdentityIterator wrapped;
- private final SimpleCondition condition;
-
- public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition)
- {
- this.wrapped = wrapped;
- this.condition = condition;
- }
-
- public ColumnFamily getColumnFamily()
- {
- return wrapped.getColumnFamily();
- }
-
- public DecoratedKey getKey()
- {
- return wrapped.getKey();
- }
-
- public void close() throws IOException
- {
- try
- {
- wrapped.close();
- }
- finally
- {
- condition.signalAll();
- }
- }
-
- public boolean hasNext()
- {
- return wrapped.hasNext();
- }
-
- public OnDiskAtom next()
- {
- return wrapped.next();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class RowContainer
- {
- // either row is not null, or wrapper is not null. But not both.
- public final Row row;
- public final NotifyingSSTableIdentityIterator wrapper;
- public static final Comparator<RowContainer> comparator = new Comparator<RowContainer>()
- {
- public int compare(RowContainer o1, RowContainer o2)
- {
- return o1.getKey().compareTo(o2.getKey());
- }
- };
-
- private RowContainer(Row row)
- {
- this.row = row;
- wrapper = null;
- }
-
- public RowContainer(NotifyingSSTableIdentityIterator wrapper)
- {
- this.wrapper = wrapper;
- row = null;
- }
-
- public DecoratedKey getKey()
- {
- return row == null ? wrapper.getKey() : row.key;
- }
- }
-
- private static class CompactedRowContainer
- {
- public final DecoratedKey key;
- /** either "future" or "row" will be not-null, but not both at once. */
- public final Future<ColumnFamily> future;
- public final LazilyCompactedRow row;
-
- private CompactedRowContainer(DecoratedKey key, Future<ColumnFamily> future)
- {
- this.key = key;
- this.future = future;
- row = null;
- }
-
- private CompactedRowContainer(LazilyCompactedRow row)
- {
- this.row = row;
- future = null;
- key = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
deleted file mode 100644
index 15ae0b8..0000000
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MergeIterator;
-
-/**
- * PrecompactedRow merges its rows in its constructor in memory.
- */
-public class PrecompactedRow extends AbstractCompactedRow
-{
- private final ColumnFamily compactedCf;
-
- /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
- public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
- {
- super(key);
- compactedCf = cf;
- }
-
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
- {
- assert key != null;
- assert controller != null;
- assert cf != null;
-
- // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
- // gets behind and has hundreds of overlapping L0 sstables. Essentially, this method is an
- // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
- // taking this into account.
- Boolean shouldPurge = null;
-
- if (cf.hasIrrelevantData(controller.gcBefore))
- shouldPurge = controller.shouldPurge(key, cf.maxTimestamp());
-
- // We should only gc tombstone if shouldPurge == true. But otherwise,
- // it is still ok to collect column that shadowed by their (deleted)
- // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
-
- if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- {
- if (shouldPurge == null)
- shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
- if (shouldPurge)
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- }
-
- return compacted;
- }
-
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
- {
- // See comment in preceding method
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
- shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
- controller.cfs.indexManager.updaterFor(key));
- if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- return compacted;
- }
-
- public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
- {
- this(rows.get(0).getKey(),
- removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
- }
-
- private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)
- {
- assert !rows.isEmpty();
-
- final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
- // transform into iterators that MergeIterator will like, and apply row-level tombstones
- List<CloseableIterator<Column>> data = new ArrayList<>(rows.size());
- for (SSTableIdentityIterator row : rows)
- {
- ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory);
- returnCF.delete(cf);
- data.add(FBUtilities.closeableIterator(cf.iterator()));
- }
-
- merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));
-
- return returnCF;
- }
-
- // returnCF should already have row-level tombstones applied
- public static void merge(final ColumnFamily returnCF, List<CloseableIterator<Column>> data, final SecondaryIndexManager.Updater indexer)
- {
- IDiskAtomFilter filter = new IdentityQueryFilter();
- Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
-
- MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>()
- {
- ColumnFamily container = returnCF.cloneMeShallow();
-
- public void reduce(Column column)
- {
- container.addColumn(column);
-
- // skip the index-update checks if there is no indexing needed since they are a bit expensive
- if (indexer == SecondaryIndexManager.nullUpdater)
- return;
-
- // notify the index that the column has been overwritten if the value being reduced has been
- // superceded by another directly, or indirectly by a range tombstone
- if ((!column.isMarkedForDelete(System.currentTimeMillis()) && !container.getColumn(column.name()).equals(column))
- || returnCF.deletionInfo().isDeleted(column.name(), CompactionManager.NO_GC))
- {
- indexer.remove(column);
- }
- }
-
- protected Column getReduced()
- {
- Column c = container.iterator().next();
- container.clear();
- return c;
- }
- };
-
- Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
- filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
- }
-
- public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
- {
- if (compactedCf == null)
- return null;
-
- return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
- }
-
- public void update(MessageDigest digest)
- {
- assert compactedCf != null;
- DataOutputBuffer buffer = new DataOutputBuffer();
- try
- {
- DeletionTime.serializer.serialize(compactedCf.deletionInfo().getTopLevelDeletion(), buffer);
- digest.update(buffer.getData(), 0, buffer.getLength());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- compactedCf.updateDigest(digest);
- }
-
- public ColumnStats columnStats()
- {
- return compactedCf.getColumnStats();
- }
-
- /**
- * @return the full column family represented by this compacted row.
- *
- * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
- * memory and don't make sense for those other implementations.
- */
- public ColumnFamily getFullColumnFamily()
- {
- return compactedCf;
- }
-
- public void close() { }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index d0aafa4..a14ab43 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -96,9 +96,9 @@ public class SSTableSplitter {
}
@Override
- public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
- return false;
+ return Long.MIN_VALUE;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e435c24..97daff4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
private final OutputHandler outputHandler;
- private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+ private static final Comparator<Row> rowComparator = new Comparator<Row>()
{
- public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ public int compare(Row r1, Row r2)
{
return r1.key.compareTo(r2.key);
}
};
- private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+ private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
public void scrub()
{
- outputHandler.output("Scrubbing " + sstable);
+ outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
// TODO errors when creating the writer may leave empty temp files.
writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- AbstractCompactedRow prevRow = null;
+ DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
outputHandler.warn("Index file contained a different key or row size; using key from data file");
}
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
key = sstable.partitioner.decorateKey(currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
}
catch (Throwable th2)
{
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- for (AbstractCompactedRow row : outOfOrderRows)
- inOrderWriter.append(row);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
}
}
+ private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+ {
+ // TODO bitch if the row is too large? if it is there's not much we can do ...
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+ // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+ ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+ while (atoms.hasNext())
+ {
+ OnDiskAtom atom = atoms.next();
+ cf.addAtom(atom);
+ }
+ outOfOrderRows.add(new Row(key, cf));
+ }
+
public SSTableReader getNewSSTable()
{
return newSstable;
@@ -356,9 +370,9 @@ public class Scrubber implements Closeable
}
@Override
- public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
- return false;
+ return Long.MIN_VALUE;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index fa21765..b98c2ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -151,9 +151,9 @@ public class Upgrader
}
@Override
- public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
- return false;
+ return Long.MIN_VALUE;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 1fa2912..e7b9e11 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -71,7 +71,7 @@ public class IndexSummaryBuilder
public IndexSummary build(IPartitioner partitioner)
{
- assert keys != null && keys.size() > 0;
+ assert keys.size() > 0;
assert keys.size() == positions.size();
Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
- columnIndexer.finish();
+ columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..18e637b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -39,6 +39,8 @@ import org.apache.cassandra.Util;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.junit.Assert.assertFalse;
+
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -144,45 +146,50 @@ public class CompactionsPurgeTest extends SchemaLoader
// verify that minor compaction still GC when key is present
// in a non-compacted sstable but the timestamp ensures we won't miss anything
cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
- Assert.assertEquals(1, cf.getColumnCount());
+ assertEquals(1, cf.getColumnCount());
}
+ /**
+ * verify that we don't drop tombstones during a minor compaction that might still be relevant
+ */
@Test
public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException
{
- // verify that we don't drop tombstones during a minor compaction that might still be relevant
CompactionManager.instance.disableAutoCompaction();
+
Keyspace keyspace = Keyspace.open(KEYSPACE2);
String cfName = "Standard1";
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
RowMutation rm;
DecoratedKey key3 = Util.dk("key3");
+
// inserts
rm = new RowMutation(KEYSPACE2, key3.key);
rm.add(cfName, ByteBufferUtil.bytes("c1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
rm.add(cfName, ByteBufferUtil.bytes("c2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
rm.apply();
cfs.forceBlockingFlush();
- // deletes
+ // delete c1
rm = new RowMutation(KEYSPACE2, key3.key);
rm.delete(cfName, ByteBufferUtil.bytes("c1"), 10);
rm.apply();
cfs.forceBlockingFlush();
Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
- // delete so we have new delete in a diffrent SST.
+ // delete c2 so we have new delete in a diffrent SSTable
rm = new RowMutation(KEYSPACE2, key3.key);
rm.delete(cfName, ByteBufferUtil.bytes("c2"), 9);
rm.apply();
cfs.forceBlockingFlush();
+
+ // compact the sstables with the c1/c2 data and the c1 tombstone
cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
- // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
- // so it would be invalid to assume we can throw out the c1 entry.
+ // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
+ // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
- Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
- Assert.assertEquals(2, cf.getColumnCount());
+ assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+ assertEquals(2, cf.getColumnCount());
}
@Test
@@ -279,16 +286,13 @@ public class CompactionsPurgeTest extends SchemaLoader
String cfName = "Standard1";
Keyspace keyspace = Keyspace.open(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
DecoratedKey key = Util.dk("key3");
RowMutation rm;
// inserts
rm = new RowMutation(keyspaceName, key.key);
for (int i = 0; i < 10; i++)
- {
rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
- }
rm.apply();
// deletes row with timestamp such that not all columns are deleted
@@ -303,12 +307,10 @@ public class CompactionsPurgeTest extends SchemaLoader
// re-inserts with timestamp lower than delete
rm = new RowMutation(keyspaceName, key.key);
for (int i = 0; i < 5; i++)
- {
rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
- }
rm.apply();
- // Check that the second insert did went in
+ // Check that the second insert went in
ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assertEquals(10, cf.getColumnCount());
for (Column c : cf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
deleted file mode 100644
index 1c286c8..0000000
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import com.google.common.base.Objects;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.MappedFileDataInput;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CloseableIterator;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class LazilyCompactedRowTest extends SchemaLoader
-{
- private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
- {
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- Collection<SSTableReader> sstables = cfs.getSSTables();
-
- // compare eager and lazy compactions
- AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
- strategy.getScanners(sstables),
- new PreCompactingController(cfs, sstables, gcBefore));
- AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
- strategy.getScanners(sstables),
- new LazilyCompactingController(cfs, sstables, gcBefore));
- assertBytes(cfs, eager, lazy);
-
- // compare eager and parallel-lazy compactions
- eager = new CompactionIterable(OperationType.UNKNOWN,
- strategy.getScanners(sstables),
- new PreCompactingController(cfs, sstables, gcBefore));
- AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
- strategy.getScanners(sstables),
- new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
- 0);
- assertBytes(cfs, eager, parallel);
- }
-
- private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
- {
- CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
- CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
- while (true)
- {
- if (!iter1.hasNext())
- {
- assert !iter2.hasNext();
- break;
- }
-
- AbstractCompactedRow row1 = iter1.next();
- AbstractCompactedRow row2 = iter2.next();
- DataOutputBuffer out1 = new DataOutputBuffer();
- DataOutputBuffer out2 = new DataOutputBuffer();
- row1.write(-1, out1);
- row2.write(-1, out2);
-
- File tmpFile1 = File.createTempFile("lcrt1", null);
- File tmpFile2 = File.createTempFile("lcrt2", null);
-
- tmpFile1.deleteOnExit();
- tmpFile2.deleteOnExit();
-
- try (OutputStream os1 = new FileOutputStream(tmpFile1);
- OutputStream os2 = new FileOutputStream(tmpFile2))
- {
- os1.write(out1.getData()); // writing data from row1
- os2.write(out2.getData()); // writing data from row2
- }
-
- try (MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
- MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0))
- {
- // row key
- assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2));
-
- // cf metadata
- ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
- ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
- cf1.delete(DeletionTime.serializer.deserialize(in1));
- cf2.delete(DeletionTime.serializer.deserialize(in2));
- assertEquals(cf1.deletionInfo(), cf2.deletionInfo());
- // columns
- while (true)
- {
- Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
- Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
- assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
- if (c1 == null)
- break;
- }
- // that should be everything
- assert in1.available() == 0;
- assert in2.available() == 0;
- }
- }
- }
-
- private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws NoSuchAlgorithmException
- {
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- Collection<SSTableReader> sstables = cfs.getSSTables();
- AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore));
- AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore));
- CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
- CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
- while (true)
- {
- if (!iter1.hasNext())
- {
- assert !iter2.hasNext();
- break;
- }
-
- AbstractCompactedRow row1 = iter1.next();
- AbstractCompactedRow row2 = iter2.next();
- MessageDigest digest1 = MessageDigest.getInstance("MD5");
- MessageDigest digest2 = MessageDigest.getInstance("MD5");
-
- row1.update(digest1);
- row2.update(digest2);
-
- assert MessageDigest.isEqual(digest1.digest(), digest2.digest());
- }
- }
-
- @Test
- public void testOneRow() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ByteBuffer key = ByteBufferUtil.bytes("k");
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.apply();
- cfs.forceBlockingFlush();
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- @Test
- public void testOneRowTwoColumns() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ByteBuffer key = ByteBufferUtil.bytes("k");
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.apply();
- cfs.forceBlockingFlush();
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- @Test
- public void testOneRowManyColumns() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ByteBuffer key = ByteBuffer.wrap("k".getBytes());
- RowMutation rm = new RowMutation("Keyspace1", key);
- for (int i = 0; i < 1000; i++)
- rm.add("Standard1", ByteBufferUtil.bytes(i), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.apply();
- DataOutputBuffer out = new DataOutputBuffer();
- RowMutation.serializer.serialize(rm, out, MessagingService.current_version);
- assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
- cfs.forceBlockingFlush();
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- @Test
- public void testTwoRows() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ByteBuffer key = ByteBufferUtil.bytes("k");
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.apply();
- cfs.forceBlockingFlush();
-
- rm.apply();
- cfs.forceBlockingFlush();
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- @Test
- public void testTwoRowsTwoColumns() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ByteBuffer key = ByteBufferUtil.bytes("k");
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
- rm.apply();
- cfs.forceBlockingFlush();
-
- rm.apply();
- cfs.forceBlockingFlush();
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- @Test
- public void testManyRows() throws IOException, NoSuchAlgorithmException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- final int ROWS_PER_SSTABLE = 10;
- for (int j = 0; j < (cfs.metadata.getIndexInterval() * 3) / ROWS_PER_SSTABLE; j++)
- {
- for (int i = 0; i < ROWS_PER_SSTABLE; i++)
- {
- ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(i % 2));
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add("Standard1", ByteBufferUtil.bytes(String.valueOf(i / 2)), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
- rm.apply();
- }
- cfs.forceBlockingFlush();
- }
-
- assertBytes(cfs, Integer.MAX_VALUE);
- assertDigest(cfs, Integer.MAX_VALUE);
- }
-
- private static class LazilyCompactingController extends CompactionController
- {
- public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
- {
- super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
- }
-
- @Override
- public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
- {
- return new LazilyCompactedRow(this, rows);
- }
- }
-
- private static class PreCompactingController extends CompactionController
- {
- public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
- {
- super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
- }
-
- @Override
- public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
- {
- return new PrecompactedRow(this, rows);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 9fa5d89..757254c 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.cassandra.repair;
+import java.io.DataOutput;
+import java.io.IOException;
import java.net.InetAddress;
+import java.security.MessageDigest;
import java.util.UUID;
import org.junit.After;
@@ -27,11 +30,12 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.TreeMapBackedSortedColumns;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -43,11 +47,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.SimpleCondition;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class ValidatorTest extends SchemaLoader
{
@@ -109,8 +109,7 @@ public class ValidatorTest extends SchemaLoader
// add a row
Token mid = partitioner.midpoint(range.left, range.right);
- validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
- TreeMapBackedSortedColumns.factory.create(cfs.metadata)));
+ validator.add(new CompactedRowStub(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!"))));
validator.complete();
// confirm that the tree was validated
@@ -121,6 +120,28 @@ public class ValidatorTest extends SchemaLoader
lock.await();
}
+ private static class CompactedRowStub extends AbstractCompactedRow
+ {
+ private CompactedRowStub(DecoratedKey key)
+ {
+ super(key);
+ }
+
+ public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void update(MessageDigest digest) { }
+
+ public ColumnStats columnStats()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException { }
+ }
+
@Test
public void testValidatorFailed() throws Throwable
{
[06/10] git commit: Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Posted by jb...@apache.org.
Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db
Branch: refs/heads/cassandra-2.0
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 22 ++++----
.../db/compaction/AbstractCompactedRow.java | 4 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/LazilyCompactedRow.java | 34 ++++++++----
.../cassandra/db/compaction/Scrubber.java | 52 ++++++++++++-------
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes
8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
* Compact hottest sstables first and optionally omit coldest from
compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
}
ColumnIndex index = build();
- finish();
+ maybeWriteEmptyRowHeader();
return index;
}
- public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+ /**
+ * The important distinction wrt build() is that we may be building for a row that ends up
+ * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+ * columns shadowed by expired tombstone). Thus, it is the caller's responsibility
+ * to decide whether to write the header for an empty row.
+ */
+ public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
{
- for (OnDiskAtom c : columns)
- add(c);
- ColumnIndex index = build();
-
- finish();
-
- return index;
+ while (columns.hasNext())
+ add(columns.next());
+ return build();
}
public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
return result;
}
- public void finish() throws IOException
+ public void maybeWriteEmptyRowHeader() throws IOException
{
if (!deletionInfo.isLive())
maybeWriteRowHeader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
/**
* write the row (size + column index + filter + column data, but NOT row key) to @param out.
- * It is an error to call this if isEmpty is false. (Because the key is appended first,
- * so we'd have an incomplete row written.)
*
* write() may change internal state; it is NOT valid to call write() or update() a second time.
+ *
+ * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
*/
public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
/**
* @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * are included in the compaction set
+ * older than @param maxDeletionTimestamp are included in the compaction set
*/
public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private final List<? extends OnDiskAtomIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
- private ColumnFamily emptyColumnFamily;
+ private final ColumnFamily emptyColumnFamily;
private Reducer reducer;
private ColumnStats columnStats;
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxDelTimestamp;
+ private long maxTombstoneTimestamp;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- maxDelTimestamp = Long.MIN_VALUE;
+ ColumnFamily rawCf = null;
+ maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
- maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+ maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
- if (emptyColumnFamily == null)
- emptyColumnFamily = cf;
+ if (rawCf == null)
+ rawCf = cf;
else
- emptyColumnFamily.delete(cf);
+ rawCf.delete(cf);
}
- this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+ // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+ // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+ // until we iterate over them. By passing MAX_VALUE we will only purge if there are
+ // no other versions of this row present.
+ this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+ // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+ // to get rid of redundant row-level and range tombstones
+ assert rawCf != null;
+ int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+ ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+ emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.build(this);
+ columnsIndex = indexBuilder.buildForCompaction(iterator());
if (columnsIndex.columnsIndex.isEmpty())
{
boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+ reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
);
reducer = null;
+ indexBuilder.maybeWriteEmptyRowHeader();
out.writeShort(SSTableWriter.END_OF_ROW);
close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
private final OutputHandler outputHandler;
- private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+ private static final Comparator<Row> rowComparator = new Comparator<Row>()
{
- public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ public int compare(Row r1, Row r2)
{
return r1.key.compareTo(r2.key);
}
};
- private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+ private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
public void scrub()
{
- outputHandler.output("Scrubbing " + sstable);
+ outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
// TODO errors when creating the writer may leave empty temp files.
writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- AbstractCompactedRow prevRow = null;
+ DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
outputHandler.warn("Index file contained a different key or row size; using key from data file");
}
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
key = sstable.partitioner.decorateKey(currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
}
catch (Throwable th2)
{
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- for (AbstractCompactedRow row : outOfOrderRows)
- inOrderWriter.append(row);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
}
}
+ private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+ {
+ // TODO bitch if the row is too large? if it is there's not much we can do ...
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+ // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+ ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+ while (atoms.hasNext())
+ {
+ OnDiskAtom atom = atoms.next();
+ cf.addAtom(atom);
+ }
+ outOfOrderRows.add(new Row(key, cf));
+ }
+
public SSTableReader getNewSSTable()
{
return newSstable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
- columnIndexer.finish();
+ columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ
[02/10] git commit: add generation back to bucketing (filtering) sort
Posted by jb...@apache.org.
add generation back to bucketing (filtering) sort
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a944940
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a944940
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a944940
Branch: refs/heads/trunk
Commit: 9a9449406ede4e9efcd14de0fbc5c0e43272cee5
Parents: 3728530
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:36:10 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500
----------------------------------------------------------------------
.../db/compaction/SizeTieredCompactionStrategy.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a944940/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 5115860..09d4e8e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -90,7 +90,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@VisibleForTesting
static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
{
- // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+ // sort the sstables by hotness (coldest-first)
Collections.sort(sstables, new Comparator<SSTableReader>()
{
public int compare(SSTableReader o1, SSTableReader o2)
@@ -99,7 +99,14 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
if (comparison != 0)
return comparison;
- return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+ // break ties with size on disk (mainly for system tables and cold tables)
+ comparison = Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+ if (comparison != 0)
+ return comparison;
+
+ // if there's still a tie, use generation, which is guaranteed to be unique. this ensures that
+ // our filtering is deterministic, which can be useful when debugging.
+ return o1.descriptor.generation - o2.descriptor.generation;
}
});