You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/12 19:47:07 UTC
[2/8] git commit: Merge branch 'cassandra-2.0' into trunk
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d53c838c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d53c838c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d53c838c
Branch: refs/heads/trunk
Commit: d53c838c9d2f89ac6c88c8306f2302f7fbc6b33d
Parents: 448e4d4 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:17:45 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:18:14 2013 +0600
----------------------------------------------------------------------
.../db/AbstractThreadUnsafeSortedColumns.java | 6 +-
.../cassandra/db/AtomicSortedColumns.java | 4 +-
.../org/apache/cassandra/db/ColumnFamily.java | 11 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 26 ++++++-
.../org/apache/cassandra/db/ColumnIndex.java | 2 +-
.../org/apache/cassandra/db/DeletionInfo.java | 76 ++++++++++++++-----
.../org/apache/cassandra/db/DeletionTime.java | 16 ++++
.../apache/cassandra/db/RangeTombstoneList.java | 2 +-
.../db/compaction/CompactionController.java | 5 +-
.../db/compaction/LazilyCompactedRow.java | 77 +++++++++++---------
test/unit/org/apache/cassandra/Util.java | 6 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 2 +-
.../db/compaction/CompactionsPurgeTest.java | 77 ++++++++++++++++++--
.../streaming/StreamingTransferTest.java | 4 +-
14 files changed, 236 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 396bbd3,d585407..4e54af0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -885,6 -898,17 +897,16 @@@ public class ColumnFamilyStore implemen
return null;
}
- removeDeletedColumnsOnly(cf, gcBefore, indexer);
- return removeDeletedCF(cf, gcBefore);
++ return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
+ }
+
+ /**
+ * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+ * columns that have been dropped from the schema (for CQL3 tables only).
+ * @return the updated ColumnFamily
+ */
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
++ public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+ {
Iterator<Column> iter = cf.iterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
@@@ -899,10 -924,15 +921,10 @@@
{
iter.remove();
indexer.remove(c);
- removedBytes += c.dataSize();
}
}
- return removedBytes;
- }
- return removeDeletedCF(cf, gcBefore);
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
- {
- return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
++ return cf;
}
// returns true if
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index dc7730c,7edc60e..c4ce2e8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -148,24 -153,25 +148,25 @@@ public class CompactionControlle
}
/**
- * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * older than @param maxDeletionTimestamp are included in the compaction set
+ * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
- * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
- * in other sstables.
++ * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
++ * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
++ * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
*/
- public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
List<SSTableReader> filteredSSTables = overlappingTree.search(key);
+ long min = Long.MAX_VALUE;
for (SSTableReader sstable : filteredSSTables)
{
- if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
- {
- // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
- // we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- return false;
- else if (sstable.getBloomFilter().isPresent(key.key))
- return false;
- }
+ // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
+ // we check index file instead.
+ if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+ min = Math.min(min, sstable.getMinTimestamp());
+ else if (sstable.getBloomFilter().isPresent(key.key))
+ min = Math.min(min, sstable.getMinTimestamp());
}
- return true;
+ return min;
}
public void invalidateCachedRow(DecoratedKey key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2cb014a,3b7a3d4..8237ff5
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,8 -58,8 +56,9 @@@ public class LazilyCompactedRow extend
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxTombstoneTimestamp;
+ private final Reducer reducer;
+ private final Iterator<OnDiskAtom> merger;
+ private DeletionInfo deletionInfo;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@@ -66,39 -68,27 +67,38 @@@
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- ColumnFamily rawCf = null;
+ // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be
+ // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+ deletionInfo = DeletionInfo.live();
- maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
-- {
- ColumnFamily cf = row.getColumnFamily();
- DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
- maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
- deletionInfo = deletionInfo.add(delInfo);
- }
++ deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
- if (rawCf == null)
- rawCf = cf;
- else
- rawCf.delete(cf);
- }
- // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
- // tombstones newer than the row-level tombstones we've seen -- but we won't know that
- // until we iterate over them. By passing MAX_VALUE we will only purge if there are
- // no other versions of this row present.
- this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
++ // tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
++ // containing `key` outside of the set of sstables involved in this compaction.
+ maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
- // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
- // to get rid of redundant row-level and range tombstones
- assert rawCf != null;
- int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
- ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
- emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+
+ emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+ emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
- if (shouldPurge)
++ if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+ emptyColumnFamily.purgeTombstones(controller.gcBefore);
+
+ reducer = new Reducer();
+ merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+ }
+
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
++ private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ {
- // We should only gc tombstone if shouldPurge == true. But otherwise,
- // it is still ok to collect column that shadowed by their (deleted)
- // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
- shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
- controller.cfs.indexManager.updaterFor(key));
- if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- return compacted;
++ // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
++ // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
++ // without purging tombstones.
++ int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
++ ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
++
++ // if we have counters, remove old shards
++ if (cf.metadata().getDefaultValidator().isCommutative())
++ CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
++
++ return cf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -109,31 -99,31 +109,29 @@@
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.buildForCompaction(iterator());
+ columnsIndex = indexBuilder.buildForCompaction(merger);
- if (columnsIndex.columnsIndex.isEmpty())
- {
- boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
- ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
- : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
- if (cfIrrelevant)
- return null;
- }
+
+ // if there aren't any columns or tombstones, return null
+ if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+ return null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
// reach into the reducer (created during iteration) to get column count, size, max column timestamp
- // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
- columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
- reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
- reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
- reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
+ columnStats = new ColumnStats(reducer.columns,
+ reducer.minTimestampSeen,
+ Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
+ reducer.maxLocalDeletionTimeSeen,
+ reducer.tombstones,
+ reducer.minColumnNameSeen,
+ reducer.maxColumnNameSeen
);
- reducer = null;
+ // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
indexBuilder.maybeWriteEmptyRowHeader();
+
out.writeShort(SSTableWriter.END_OF_ROW);
close();
@@@ -245,8 -257,9 +252,10 @@@
}
else
{
+ boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+ // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+ container.setDeletionInfo(deletionInfo);
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+ ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 18e637b,8461023..bd814d0
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -38,8 -38,10 +38,11 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.Util;
import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++
+ import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -286,8 -283,10 +289,9 @@@ public class CompactionsPurgeTest exten
String cfName = "Standard1";
Keyspace keyspace = Keyspace.open(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
DecoratedKey key = Util.dk("key3");
RowMutation rm;
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
// inserts
rm = new RowMutation(keyspaceName, key.key);
@@@ -316,4 -322,60 +323,60 @@@
for (Column c : cf)
assert !c.isMarkedForDelete(System.currentTimeMillis());
}
- }
+
+ @Test
+ public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+ {
+ String keyspace = "cql_keyspace";
+ String table = "table1";
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+
+ UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a second sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // basic check that the row is considered deleted
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+ Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+ future.get();
+
+ // the data should be gone, but the tombstone should still exist
+ assertEquals(1, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a different sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+ future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+ future.get();
+
+ // both the data and the tombstone should be gone this time
+ assertEquals(0, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------