You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:34 UTC
[06/10] git commit: Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db
Branch: refs/heads/cassandra-2.0
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 22 ++++----
.../db/compaction/AbstractCompactedRow.java | 4 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/LazilyCompactedRow.java | 34 ++++++++----
.../cassandra/db/compaction/Scrubber.java | 52 ++++++++++++-------
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes
8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
* Compact hottest sstables first and optionally omit coldest from
compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
}
ColumnIndex index = build();
- finish();
+ maybeWriteEmptyRowHeader();
return index;
}
- public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+ /**
+ * The important distinction wrt build() is that we may be building for a row that ends up
+ * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+ * columns shadowed by expired tombstone). Thus, it is the caller's responsibility
+ * to decide whether to write the header for an empty row.
+ */
+ public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
{
- for (OnDiskAtom c : columns)
- add(c);
- ColumnIndex index = build();
-
- finish();
-
- return index;
+ while (columns.hasNext())
+ add(columns.next());
+ return build();
}
public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
return result;
}
- public void finish() throws IOException
+ public void maybeWriteEmptyRowHeader() throws IOException
{
if (!deletionInfo.isLive())
maybeWriteRowHeader();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
/**
* write the row (size + column index + filter + column data, but NOT row key) to @param out.
- * It is an error to call this if isEmpty is false. (Because the key is appended first,
- * so we'd have an incomplete row written.)
*
* write() may change internal state; it is NOT valid to call write() or update() a second time.
+ *
+ * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
*/
public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
/**
* @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * are included in the compaction set
+ * older than @param maxDeletionTimestamp are included in the compaction set
*/
public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private final List<? extends OnDiskAtomIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
- private ColumnFamily emptyColumnFamily;
+ private final ColumnFamily emptyColumnFamily;
private Reducer reducer;
private ColumnStats columnStats;
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxDelTimestamp;
+ private long maxTombstoneTimestamp;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- maxDelTimestamp = Long.MIN_VALUE;
+ ColumnFamily rawCf = null;
+ maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
- maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+ maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
- if (emptyColumnFamily == null)
- emptyColumnFamily = cf;
+ if (rawCf == null)
+ rawCf = cf;
else
- emptyColumnFamily.delete(cf);
+ rawCf.delete(cf);
}
- this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+ // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+ // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+ // until we iterate over them. By passing MAX_VALUE we will only purge if there are
+ // no other versions of this row present.
+ this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+ // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+ // to get rid of redundant row-level and range tombstones
+ assert rawCf != null;
+ int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+ ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+ emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.build(this);
+ columnsIndex = indexBuilder.buildForCompaction(iterator());
if (columnsIndex.columnsIndex.isEmpty())
{
boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+ reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
);
reducer = null;
+ indexBuilder.maybeWriteEmptyRowHeader();
out.writeShort(SSTableWriter.END_OF_ROW);
close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
private final OutputHandler outputHandler;
- private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+ private static final Comparator<Row> rowComparator = new Comparator<Row>()
{
- public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ public int compare(Row r1, Row r2)
{
return r1.key.compareTo(r2.key);
}
};
- private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+ private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
public void scrub()
{
- outputHandler.output("Scrubbing " + sstable);
+ outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
// TODO errors when creating the writer may leave empty temp files.
writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- AbstractCompactedRow prevRow = null;
+ DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
outputHandler.warn("Index file contained a different key or row size; using key from data file");
}
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
key = sstable.partitioner.decorateKey(currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ if (prevKey != null && prevKey.compareTo(key) > 0)
{
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ saveOutOfOrderRow(prevKey, key, atoms);
continue;
}
+
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.append(compactedRow) == null)
emptyRows++;
else
goodRows++;
- prevRow = compactedRow;
+ prevKey = key;
}
catch (Throwable th2)
{
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
- for (AbstractCompactedRow row : outOfOrderRows)
- inOrderWriter.append(row);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
}
}
+ private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+ {
+ // TODO bitch if the row is too large? if it is there's not much we can do ...
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+ // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+ // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+ ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+ while (atoms.hasNext())
+ {
+ OnDiskAtom atom = atoms.next();
+ cf.addAtom(atom);
+ }
+ outOfOrderRows.add(new Row(key, cf));
+ }
+
public SSTableReader getNewSSTable()
{
return newSstable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
columnIndexer.add(atom); // This write the atom on disk too
}
- columnIndexer.finish();
+ columnIndexer.maybeWriteEmptyRowHeader();
dataFile.stream.writeShort(END_OF_ROW);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ