You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/12 19:47:06 UTC
[1/8] git commit: Fix row tombstones in larger-than-memory
compactions patch by thobbs; reviewed by jbellis for CASSANDRA-6008
Updated Branches:
refs/heads/trunk d16d5c4f2 -> a9b93c257
Fix row tombstones in larger-than-memory compactions
patch by thobbs; reviewed by jbellis for CASSANDRA-6008
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3edb62bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3edb62bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3edb62bf
Branch: refs/heads/trunk
Commit: 3edb62bf773617aeb3a348edc5667a6b0bad0ffe
Parents: e6eb550
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:28:13 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:17:33 2013 +0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/AbstractThreadUnsafeSortedColumns.java | 6 +-
.../cassandra/db/AtomicSortedColumns.java | 4 +-
.../org/apache/cassandra/db/ColumnFamily.java | 11 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 21 ++++-
.../org/apache/cassandra/db/ColumnIndex.java | 2 +-
.../org/apache/cassandra/db/DeletionInfo.java | 76 +++++++++++++-----
.../org/apache/cassandra/db/DeletionTime.java | 16 ++++
.../apache/cassandra/db/RangeTombstoneList.java | 2 +-
.../db/compaction/LazilyCompactedRow.java | 54 +++++++------
test/unit/org/apache/cassandra/Util.java | 6 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 2 +-
.../db/compaction/CompactionsPurgeTest.java | 84 +++++++++++++++++---
.../streaming/StreamingTransferTest.java | 4 +-
14 files changed, 220 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 30f863e..d573e37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.4
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
* Fix cleanup ClassCastException (CASSANDRA-6462)
* Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
* Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index 1b245eb..36b051b 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -59,7 +59,11 @@ public abstract class AbstractThreadUnsafeSortedColumns extends ColumnFamily
deletionInfo = newInfo;
}
- public void maybeResetDeletionTimes(int gcBefore)
+ /**
+ * Purges any tombstones with a local deletion time before gcBefore.
+ * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+ */
+ public void purgeTombstones(int gcBefore)
{
deletionInfo.purge(gcBefore);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index f6a6b83..b44d8bf 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -120,12 +120,12 @@ public class AtomicSortedColumns extends ColumnFamily
ref.set(ref.get().with(newInfo));
}
- public void maybeResetDeletionTimes(int gcBefore)
+ public void purgeTombstones(int gcBefore)
{
while (true)
{
Holder current = ref.get();
- if (!current.deletionInfo.hasIrrelevantData(gcBefore))
+ if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
break;
DeletionInfo purgedInfo = current.deletionInfo.copy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 47b14b9..2c00071 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -185,7 +185,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public abstract void delete(DeletionTime deletionTime);
protected abstract void delete(RangeTombstone tombstone);
- public abstract void maybeResetDeletionTimes(int gcBefore);
+ /**
+ * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore.
+ * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+ */
+ public abstract void purgeTombstones(int gcBefore);
/**
* Adds a column to this column map.
@@ -268,6 +272,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
*/
public abstract boolean isInsertReversed();
+ /**
+ * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns.
+ */
public void delete(ColumnFamily columns)
{
delete(columns.deletionInfo());
@@ -459,7 +466,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
public boolean hasIrrelevantData(int gcBefore)
{
// Do we have gcable deletion infos?
- if (deletionInfo().hasIrrelevantData(gcBefore))
+ if (deletionInfo().hasPurgeableTombstones(gcBefore))
return true;
// Do we have colums that are either deleted by the container or gcable tombstone?
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index eb715ac..d585407 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -862,12 +862,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ /**
+ * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left,
+ * null otherwise.
+ * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged
+ */
public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
{
- cf.maybeResetDeletionTimes(gcBefore);
+ // purge old top-level and range tombstones
+ cf.purgeTombstones(gcBefore);
+
+ // if there are no columns or tombstones left, return null
return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf;
}
+ /**
+ * Removes deleted columns and purges gc-able tombstones.
+ * @return an updated `cf` if any columns or tombstones remain, null otherwise
+ */
public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
{
return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
@@ -890,7 +902,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return removeDeletedCF(cf, gcBefore);
}
- private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+ /**
+ * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+ * columns that have been dropped from the schema (for CQL3 tables only).
+ * @return the updated ColumnFamily
+ */
+ public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
{
Iterator<Column> iter = cf.iterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 75a06d7..f6b38b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -67,7 +67,7 @@ public class ColumnIndex
private final RangeTombstone.Tracker tombstoneTracker;
private int atomCount;
private final ByteBuffer key;
- private final DeletionInfo deletionInfo;
+ private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
public Builder(ColumnFamily cf,
ByteBuffer key,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 4e1d68d..13fc824 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -29,15 +29,32 @@ import com.google.common.collect.Iterators;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IVersionedSerializer;
+/**
+ * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
+ * within a {@link ColumnFamily} (or row).
+ */
public class DeletionInfo
{
private static final Serializer serializer = new Serializer();
- // We don't have way to represent the full interval of keys (Interval don't support the minimum token as the right bound),
- // so we keep the topLevel deletion info separatly. This also slightly optimize the case of full row deletion which is rather common.
+ /**
+ * This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's
+ * kept separately. This also slightly optimizes the common case of a full row deletion.
+ */
private DeletionTime topLevel;
- private RangeTombstoneList ranges; // null if no range tombstones (to save an allocation since it's a common case).
+ /**
+ * A list of range tombstones within the row. This is left as null if there are no range tombstones
+ * (to save an allocation (since it's a common case).
+ */
+ private RangeTombstoneList ranges;
+
+ /**
+ * Creates a DeletionInfo with only a top-level (row) tombstone.
+ * @param markedForDeleteAt the time after which the entire row should be considered deleted
+ * @param localDeletionTime what time the deletion write was applied locally (for purposes of
+ * purging the tombstone after gc_grace_seconds).
+ */
public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
{
// Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
@@ -61,17 +78,20 @@ public class DeletionInfo
this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
}
- public static DeletionInfo live()
- {
- return new DeletionInfo(DeletionTime.LIVE);
- }
-
private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
{
this.topLevel = topLevel;
this.ranges = ranges;
}
+ /**
+ * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones.
+ */
+ public static DeletionInfo live()
+ {
+ return new DeletionInfo(DeletionTime.LIVE);
+ }
+
public static Serializer serializer()
{
return serializer;
@@ -93,8 +113,7 @@ public class DeletionInfo
}
/**
- * Return whether a given column is deleted by the container having this
- * deletion info.
+ * Return whether a given column is deleted by the container having this deletion info.
*
* @param column the column to check.
* @return true if the column is deleted, false otherwise
@@ -137,8 +156,7 @@ public class DeletionInfo
/**
* Purge every tombstones that are older than {@code gcbefore}.
*
- * @param gcBefore timestamp (in seconds) before which tombstones should
- * be purged
+ * @param gcBefore timestamp (in seconds) before which tombstones should be purged
*/
public void purge(int gcBefore)
{
@@ -152,14 +170,24 @@ public class DeletionInfo
}
}
- public boolean hasIrrelevantData(int gcBefore)
+ /**
+ * Returns true if {@code purge} would remove the top-level tombstone or any of the range
+ * tombstones, false otherwise.
+ * @param gcBefore timestamp (in seconds) before which tombstones should be purged
+ */
+ public boolean hasPurgeableTombstones(int gcBefore)
{
if (topLevel.localDeletionTime < gcBefore)
return true;
- return ranges != null && ranges.hasIrrelevantData(gcBefore);
+ return ranges != null && ranges.hasPurgeableTombstones(gcBefore);
}
+ /**
+ * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt
+ * timestamp.
+ * @param newInfo
+ */
public void add(DeletionTime newInfo)
{
if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
@@ -175,7 +203,9 @@ public class DeletionInfo
}
/**
- * Adds the provided deletion infos to the current ones.
+ * Combines another DeletionInfo with this one and returns the result. Whichever top-level tombstone
+ * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime. The
+ * range tombstones will be combined.
*
* @return this object.
*/
@@ -191,6 +221,9 @@ public class DeletionInfo
return this;
}
+ /**
+ * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone.
+ */
public long minTimestamp()
{
return ranges == null
@@ -199,7 +232,7 @@ public class DeletionInfo
}
/**
- * The maximum timestamp mentioned by this DeletionInfo.
+ * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone.
*/
public long maxTimestamp()
{
@@ -208,6 +241,9 @@ public class DeletionInfo
: Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
}
+ /**
+ * Returns the top-level (or "row") tombstone.
+ */
public DeletionTime getTopLevelDeletion()
{
return topLevel;
@@ -326,7 +362,7 @@ public class DeletionInfo
/**
* This object allow testing whether a given column (name/timestamp) is deleted
- * or not by this DeletionInfo, assuming that the column given to this
+ * or not by this DeletionInfo, assuming that the columns given to this
* object are passed in forward or reversed comparator sorted order.
*
* This is more efficient that calling DeletionInfo.isDeleted() repeatedly
@@ -336,9 +372,9 @@ public class DeletionInfo
{
/*
* Note that because because range tombstone are added to this DeletionInfo while we iterate,
- * ranges may be null initially and we need to wait the first range to create the tester (once
- * created the test will pick up new tombstones however). We do are guaranteed that a range tombstone
- * will be added *before* we test any column that it may delete so this is ok.
+ * `ranges` may be null initially and we need to wait for the first range to create the tester (once
+ * created the test will pick up new tombstones however). We are guaranteed that a range tombstone
+ * will be added *before* we test any column that it may delete, so this is ok.
*/
private RangeTombstoneList.InOrderTester tester;
private final boolean reversed;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 3d6fad4..b80422c 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -27,11 +27,27 @@ import com.google.common.base.Objects;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.utils.ObjectSizes;
+/**
+ * A top-level (row) tombstone.
+ */
public class DeletionTime implements Comparable<DeletionTime>
{
+ /**
+ * A special DeletionTime that signifies that there is no top-level (row) tombstone.
+ */
public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+ /**
+ * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
+ * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
+ * for deletion at all.
+ */
public final long markedForDeleteAt;
+
+ /**
+ * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is
+ * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed.
+ */
public final int localDeletionTime;
public static final ISerializer<DeletionTime> serializer = new Serializer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index fe61916..dad9004 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -305,7 +305,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
/**
* Returns whether {@code purge(gcBefore)} would remove something or not.
*/
- public boolean hasIrrelevantData(int gcBefore)
+ public boolean hasPurgeableTombstones(int gcBefore)
{
for (int i = 0; i < size; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0cdbbb7..3b7a3d4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -59,6 +59,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
private long maxTombstoneTimestamp;
+ private DeletionInfo deletionInfo;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -67,17 +68,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- ColumnFamily rawCf = null;
+ // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be
+ // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+ deletionInfo = DeletionInfo.live();
maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
- ColumnFamily cf = row.getColumnFamily();
- maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
-
- if (rawCf == null)
- rawCf = cf;
- else
- rawCf.delete(cf);
+ DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
+ maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
+ deletionInfo = deletionInfo.add(delInfo);
}
// Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
@@ -86,12 +85,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// no other versions of this row present.
this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
- // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
- // to get rid of redundant row-level and range tombstones
- assert rawCf != null;
- int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
- ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
- emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+ emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+ emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+ if (shouldPurge)
+ emptyColumnFamily.purgeTombstones(controller.gcBefore);
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -103,14 +100,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
columnsIndex = indexBuilder.buildForCompaction(iterator());
- if (columnsIndex.columnsIndex.isEmpty())
- {
- boolean cfIrrelevant = shouldPurge
- ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
- : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
- if (cfIrrelevant)
- return null;
- }
+
+ // if there aren't any columns or tombstones, return null
+ if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+ return null;
}
catch (IOException e)
{
@@ -128,7 +121,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
);
reducer = null;
+ // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
indexBuilder.maybeWriteEmptyRowHeader();
+
out.writeShort(SSTableWriter.END_OF_ROW);
close();
@@ -201,7 +196,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// in the container; we just want to leverage the conflict resolution code from CF
ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- // tombstone reference; will be reconciled w/ column during getReduced
+ // tombstone reference; will be reconciled w/ column during getReduced. Note that the top-level (row) tombstone
+ // is held by LCR.deletionInfo.
RangeTombstone tombstone;
int columns = 0;
@@ -212,11 +208,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
+ /**
+ * Called once per version of a cell that we need to merge, after which getReduced() is called. In other words,
+ * this will be called one or more times with cells that share the same column name.
+ */
public void reduce(OnDiskAtom current)
{
if (current instanceof RangeTombstone)
{
- tombstone = (RangeTombstone)current;
+ if (tombstone == null || current.maxTimestamp() >= tombstone.maxTimestamp())
+ tombstone = (RangeTombstone)current;
}
else
{
@@ -235,6 +236,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
}
+ /**
+ * Called after reduce() has been called for each cell sharing the same name.
+ */
protected OnDiskAtom getReduced()
{
if (tombstone != null)
@@ -253,6 +257,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
else
{
+ // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+ container.setDeletionInfo(deletionInfo);
ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b4a375a..a71dc48 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -245,12 +245,12 @@ public class Util
assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
}
- public static Future<?> compactAll(ColumnFamilyStore cfs)
+ public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore)
{
- List<Descriptor> descriptors = new ArrayList<Descriptor>();
+ List<Descriptor> descriptors = new ArrayList<>();
for (SSTableReader sstable : cfs.getSSTables())
descriptors.add(sstable.descriptor);
- return CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE);
+ return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore);
}
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index fd685b4..1f41860 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -145,7 +145,7 @@ public class KeyCacheTest extends SchemaLoader
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
// after compaction cache should have entries for
// new SSTables, if we had 2 keys in cache previously it should become 4
assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..8461023 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -21,12 +21,12 @@ package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
-
-import org.junit.Assert;
+import java.util.concurrent.Future;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.DecoratedKey;
@@ -38,7 +38,11 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.Util;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -144,7 +148,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// verify that minor compaction still GC when key is present
// in a non-compacted sstable but the timestamp ensures we won't miss anything
cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
- Assert.assertEquals(1, cf.getColumnCount());
+ assertEquals(1, cf.getColumnCount());
}
@Test
@@ -181,8 +185,8 @@ public class CompactionsPurgeTest extends SchemaLoader
// we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
// so it would be invalid to assume we can throw out the c1 entry.
ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
- Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
- Assert.assertEquals(2, cf.getColumnCount());
+ assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+ assertEquals(2, cf.getColumnCount());
}
@Test
@@ -216,7 +220,7 @@ public class CompactionsPurgeTest extends SchemaLoader
assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
// compact and test that the row is completely gone
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
assert cfs.getSSTables().isEmpty();
ColumnFamily cf = keyspace.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assert cf == null : cf;
@@ -253,7 +257,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// flush and major compact
cfs.forceBlockingFlush();
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
// re-inserts with timestamp lower than delete
rm = new RowMutation(keyspaceName, key.key);
@@ -282,6 +286,7 @@ public class CompactionsPurgeTest extends SchemaLoader
DecoratedKey key = Util.dk("key3");
RowMutation rm;
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
// inserts
rm = new RowMutation(keyspaceName, key.key);
@@ -295,10 +300,13 @@ public class CompactionsPurgeTest extends SchemaLoader
rm = new RowMutation(keyspaceName, key.key);
rm.delete(cfName, 4);
rm.apply();
+ ColumnFamily cf = cfs.getColumnFamily(filter);
+ assertTrue(cf.isMarkedForDelete());
// flush and major compact (with tombstone purging)
cfs.forceBlockingFlush();
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
+ assertFalse(cfs.getColumnFamily(filter).isMarkedForDelete());
// re-inserts with timestamp lower than delete
rm = new RowMutation(keyspaceName, key.key);
@@ -308,10 +316,66 @@ public class CompactionsPurgeTest extends SchemaLoader
}
rm.apply();
- // Check that the second insert did went in
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
+ // Check that the second insert went in
+ cf = cfs.getColumnFamily(filter);
assertEquals(10, cf.getColumnCount());
for (Column c : cf)
assert !c.isMarkedForDelete(System.currentTimeMillis());
}
+
+ @Test
+ public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+ {
+ String keyspace = "cql_keyspace";
+ String table = "table1";
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+
+ UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a second sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // basic check that the row is considered deleted
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+ Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+ future.get();
+
+ // the data should be gone, but the tombstone should still exist
+ assertEquals(1, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a different sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+ future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+ future.get();
+
+ // both the data and the tombstone should be gone this time
+ assertEquals(0, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7787314..7622728 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -136,7 +136,7 @@ public class StreamingTransferTest extends SchemaLoader
for (int i = 1; i <= 3; i++)
mutator.mutate("key" + i, "col" + i, timestamp);
cfs.forceBlockingFlush();
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
assertEquals(1, cfs.getSSTables().size());
// transfer the first and last key
@@ -468,7 +468,7 @@ public class StreamingTransferTest extends SchemaLoader
for (int i = 1; i <= 6000; i++)
mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
cfs.forceBlockingFlush();
- Util.compactAll(cfs).get();
+ Util.compactAll(cfs, Integer.MAX_VALUE).get();
SSTableReader sstable = cfs.getSSTables().iterator().next();
cfs.clearUnsafe();
[4/8] git commit: merge from 2.0
Posted by jb...@apache.org.
merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0bfa210d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0bfa210d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0bfa210d
Branch: refs/heads/trunk
Commit: 0bfa210d071b664b37d6ba5ee4eda280f47d7b0e
Parents: d53c838 4e9a7b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:34:09 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:34:09 2013 +0600
----------------------------------------------------------------------
.../db/compaction/LazilyCompactedRow.java | 30 ++++++++++++--------
1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bfa210d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 8237ff5,0d33b22..23457bc
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,9 -58,7 +56,9 @@@ public class LazilyCompactedRow extend
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
+ private final Reducer reducer;
+ private final Iterator<OnDiskAtom> merger;
- private DeletionInfo deletionInfo;
+ private DeletionTime maxRowTombstone;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@@ -69,36 -69,25 +69,40 @@@
// Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be
// purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
- deletionInfo = DeletionInfo.live();
+ maxRowTombstone = DeletionTime.LIVE;
for (OnDiskAtomIterator row : rows)
- deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
+ {
+ DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
+ if (maxRowTombstone.compareTo(rowTombstone) < 0)
+ maxRowTombstone = rowTombstone;
+ }
-
- // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
- // tombstones newer than the row-level tombstones we've seen -- but we won't know that
- // until we iterate over them. By passing MAX_VALUE we will only purge if there are
- // no other versions of this row present.
- this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+ // tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
+ // containing `key` outside of the set of sstables involved in this compaction.
+ maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
- emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
- emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
- if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+ emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata);
+ emptyColumnFamily.delete(maxRowTombstone);
- if (shouldPurge)
++ if (maxRowTombstone.markedForDeleteAt < maxPurgeableTimestamp)
emptyColumnFamily.purgeTombstones(controller.gcBefore);
+
+ reducer = new Reducer();
+ merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+ }
+
+ private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ {
+ // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
+ // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
+ // without purging tombstones.
+ int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+ ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
+
+ // if we have counters, remove old shards
+ if (cf.metadata().getDefaultValidator().isCommutative())
+ CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
+
+ return cf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -252,19 -257,18 +257,20 @@@
}
else
{
+ boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
// when we clear() the container, it removes the deletion info, so this needs to be reset each time
- container.setDeletionInfo(deletionInfo);
- ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
- if (purged == null || !purged.iterator().hasNext())
+ container.delete(maxRowTombstone);
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
- if (purged == null || !purged.iterator().hasNext())
++ removeDeletedAndOldShards(key, shouldPurge, controller, container);
++ Iterator<Column> iter = container.iterator();
++ if (!iter.hasNext())
{
container.clear();
return null;
}
-- Column reduced = purged.iterator().next();
++ Column reduced = iter.next();
container.clear();
- // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
+ // removeDeletedAndOldShards have only checked the top-level CF deletion times,
// not the range tombstone. For that we use the columnIndexer tombstone tracker.
if (indexBuilder.tombstoneTracker().isDeleted(reduced))
{
[5/8] git commit: fix stats to omit purged row tombstone
Posted by jb...@apache.org.
fix stats to omit purged row tombstone
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d8da2ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d8da2ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d8da2ee
Branch: refs/heads/trunk
Commit: 0d8da2ee3c9de7b890b5630c3e0c74b8c80e63dc
Parents: 4e9a7b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:35:41 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:35:41 2013 +0600
----------------------------------------------------------------------
.../org/apache/cassandra/db/compaction/LazilyCompactedRow.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d8da2ee/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0d33b22..0ad3de2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -112,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
+ reducer == null ? emptyColumnFamily.maxTimestamp() : Math.max(emptyColumnFamily.maxTimestamp(), reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
[7/8] git commit: add back shouldPurge check before counter merging
Posted by jb...@apache.org.
add back shouldPurge check before counter merging
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1711488
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1711488
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1711488
Branch: refs/heads/trunk
Commit: b1711488801781106c90e9143678f94d102e11dd
Parents: 5d25d6d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:44:13 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:44:13 2013 +0600
----------------------------------------------------------------------
.../apache/cassandra/db/compaction/LazilyCompactedRow.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1711488/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 23457bc..bb00d23 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -90,7 +90,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
}
- private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ private static void removeDeletedAndOldShards(ColumnFamily cf, boolean shouldPurge, DecoratedKey key, CompactionController controller)
{
// We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
// are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
@@ -99,10 +99,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
// if we have counters, remove old shards
- if (cf.metadata().getDefaultValidator().isCommutative())
+ if (shouldPurge && cf.metadata().getDefaultValidator().isCommutative())
CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
-
- return cf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -260,7 +258,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
// when we clear() the container, it removes the deletion info, so this needs to be reset each time
container.delete(maxRowTombstone);
- removeDeletedAndOldShards(key, shouldPurge, controller, container);
+ removeDeletedAndOldShards(container, shouldPurge, key, controller);
Iterator<Column> iter = container.iterator();
if (!iter.hasNext())
{
[2/8] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d53c838c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d53c838c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d53c838c
Branch: refs/heads/trunk
Commit: d53c838c9d2f89ac6c88c8306f2302f7fbc6b33d
Parents: 448e4d4 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:17:45 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:18:14 2013 +0600
----------------------------------------------------------------------
.../db/AbstractThreadUnsafeSortedColumns.java | 6 +-
.../cassandra/db/AtomicSortedColumns.java | 4 +-
.../org/apache/cassandra/db/ColumnFamily.java | 11 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 26 ++++++-
.../org/apache/cassandra/db/ColumnIndex.java | 2 +-
.../org/apache/cassandra/db/DeletionInfo.java | 76 ++++++++++++++-----
.../org/apache/cassandra/db/DeletionTime.java | 16 ++++
.../apache/cassandra/db/RangeTombstoneList.java | 2 +-
.../db/compaction/CompactionController.java | 5 +-
.../db/compaction/LazilyCompactedRow.java | 77 +++++++++++---------
test/unit/org/apache/cassandra/Util.java | 6 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 2 +-
.../db/compaction/CompactionsPurgeTest.java | 77 ++++++++++++++++++--
.../streaming/StreamingTransferTest.java | 4 +-
14 files changed, 236 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 396bbd3,d585407..4e54af0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -885,6 -898,17 +897,16 @@@ public class ColumnFamilyStore implemen
return null;
}
- removeDeletedColumnsOnly(cf, gcBefore, indexer);
- return removeDeletedCF(cf, gcBefore);
++ return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
+ }
+
+ /**
+ * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+ * columns that have been dropped from the schema (for CQL3 tables only).
+ * @return the updated ColumnFamily
+ */
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
++ public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+ {
Iterator<Column> iter = cf.iterator();
DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
@@@ -899,10 -924,15 +921,10 @@@
{
iter.remove();
indexer.remove(c);
- removedBytes += c.dataSize();
}
}
- return removedBytes;
- }
- return removeDeletedCF(cf, gcBefore);
- public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
- {
- return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
++ return cf;
}
// returns true if
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index dc7730c,7edc60e..c4ce2e8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -148,24 -153,25 +148,25 @@@ public class CompactionControlle
}
/**
- * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
- * older than @param maxDeletionTimestamp are included in the compaction set
+ * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
- * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
- * in other sstables.
++ * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
++ * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
++ * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
*/
- public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+ public long maxPurgeableTimestamp(DecoratedKey key)
{
List<SSTableReader> filteredSSTables = overlappingTree.search(key);
+ long min = Long.MAX_VALUE;
for (SSTableReader sstable : filteredSSTables)
{
- if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
- {
- // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
- // we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- return false;
- else if (sstable.getBloomFilter().isPresent(key.key))
- return false;
- }
+ // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
+ // we check index file instead.
+ if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+ min = Math.min(min, sstable.getMinTimestamp());
+ else if (sstable.getBloomFilter().isPresent(key.key))
+ min = Math.min(min, sstable.getMinTimestamp());
}
- return true;
+ return min;
}
public void invalidateCachedRow(DecoratedKey key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2cb014a,3b7a3d4..8237ff5
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,8 -58,8 +56,9 @@@ public class LazilyCompactedRow extend
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxTombstoneTimestamp;
+ private final Reducer reducer;
+ private final Iterator<OnDiskAtom> merger;
+ private DeletionInfo deletionInfo;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@@ -66,39 -68,27 +67,38 @@@
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- ColumnFamily rawCf = null;
+ // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be
+ // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+ deletionInfo = DeletionInfo.live();
- maxTombstoneTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
-- {
- ColumnFamily cf = row.getColumnFamily();
- DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
- maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
- deletionInfo = deletionInfo.add(delInfo);
- }
++ deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
- if (rawCf == null)
- rawCf = cf;
- else
- rawCf.delete(cf);
- }
- // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
- // tombstones newer than the row-level tombstones we've seen -- but we won't know that
- // until we iterate over them. By passing MAX_VALUE we will only purge if there are
- // no other versions of this row present.
- this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
++ // tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
++ // containing `key` outside of the set of sstables involved in this compaction.
+ maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
- // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
- // to get rid of redundant row-level and range tombstones
- assert rawCf != null;
- int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
- ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
- emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+
+ emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+ emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
- if (shouldPurge)
++ if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+ emptyColumnFamily.purgeTombstones(controller.gcBefore);
+
+ reducer = new Reducer();
+ merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+ }
+
- public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
++ private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+ {
- // We should only gc tombstone if shouldPurge == true. But otherwise,
- // it is still ok to collect column that shadowed by their (deleted)
- // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
- ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
- shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
- controller.cfs.indexManager.updaterFor(key));
- if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
- CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
- return compacted;
++ // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
++ // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
++ // without purging tombstones.
++ int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
++ ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
++
++ // if we have counters, remove old shards
++ if (cf.metadata().getDefaultValidator().isCommutative())
++ CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
++
++ return cf;
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -109,31 -99,31 +109,29 @@@
try
{
indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- columnsIndex = indexBuilder.buildForCompaction(iterator());
+ columnsIndex = indexBuilder.buildForCompaction(merger);
- if (columnsIndex.columnsIndex.isEmpty())
- {
- boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
- ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
- : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
- if (cfIrrelevant)
- return null;
- }
+
+ // if there aren't any columns or tombstones, return null
+ if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+ return null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
// reach into the reducer (created during iteration) to get column count, size, max column timestamp
- // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
- columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
- reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
- reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
- reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
- reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
+ columnStats = new ColumnStats(reducer.columns,
+ reducer.minTimestampSeen,
+ Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
+ reducer.maxLocalDeletionTimeSeen,
+ reducer.tombstones,
+ reducer.minColumnNameSeen,
+ reducer.maxColumnNameSeen
);
- reducer = null;
+ // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
indexBuilder.maybeWriteEmptyRowHeader();
+
out.writeShort(SSTableWriter.END_OF_ROW);
close();
@@@ -245,8 -257,9 +252,10 @@@
}
else
{
+ boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+ // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+ container.setDeletionInfo(deletionInfo);
- ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+ ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 18e637b,8461023..bd814d0
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -38,8 -38,10 +38,11 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.Util;
import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++
+ import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -286,8 -283,10 +289,9 @@@ public class CompactionsPurgeTest exten
String cfName = "Standard1";
Keyspace keyspace = Keyspace.open(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
DecoratedKey key = Util.dk("key3");
RowMutation rm;
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
// inserts
rm = new RowMutation(keyspaceName, key.key);
@@@ -316,4 -322,60 +323,60 @@@
for (Column c : cf)
assert !c.isMarkedForDelete(System.currentTimeMillis());
}
- }
+
+ @Test
+ public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+ {
+ String keyspace = "cql_keyspace";
+ String table = "table1";
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+
+ UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a second sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // basic check that the row is considered deleted
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+ Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+ future.get();
+
+ // the data should be gone, but the tombstone should still exist
+ assertEquals(1, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+
+ // write a row out to one sstable
+ processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+ keyspace, table, 1, "foo", 1));
+ cfs.forceBlockingFlush();
+ assertEquals(2, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(1, result.size());
+
+ // write a row tombstone out to a different sstable
+ processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ cfs.forceBlockingFlush();
+
+ // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+ future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+ future.get();
+
+ // both the data and the tombstone should be gone this time
+ assertEquals(0, cfs.getSSTables().size());
+ result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+ assertEquals(0, result.size());
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
[3/8] git commit: clarify that we only collect row-level tombstone in
LCR constructor
Posted by jb...@apache.org.
clarify that we only collect row-level tombstone in LCR constructor
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e9a7b8c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e9a7b8c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e9a7b8c
Branch: refs/heads/trunk
Commit: 4e9a7b8c7fa55df9cda4ac06f77ee9c69b85314d
Parents: 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:43:59 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:26:14 2013 +0600
----------------------------------------------------------------------
.../db/compaction/LazilyCompactedRow.java | 26 ++++++++++----------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e9a7b8c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 3b7a3d4..0d33b22 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -58,8 +58,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private boolean closed;
private ColumnIndex.Builder indexBuilder;
private final SecondaryIndexManager.Updater indexer;
- private long maxTombstoneTimestamp;
- private DeletionInfo deletionInfo;
+ private DeletionTime maxRowTombstone;
public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
@@ -70,23 +69,23 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be
// purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
- deletionInfo = DeletionInfo.live();
- maxTombstoneTimestamp = Long.MIN_VALUE;
+ maxRowTombstone = DeletionTime.LIVE;
for (OnDiskAtomIterator row : rows)
{
- DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
- maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
- deletionInfo = deletionInfo.add(delInfo);
+ DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
+ if (maxRowTombstone.compareTo(rowTombstone) < 0)
+ maxRowTombstone = rowTombstone;
}
+
// Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
// tombstones newer than the row-level tombstones we've seen -- but we won't know that
// until we iterate over them. By passing MAX_VALUE we will only purge if there are
// no other versions of this row present.
this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
- emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
- emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+ emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata);
+ emptyColumnFamily.delete(maxRowTombstone);
if (shouldPurge)
emptyColumnFamily.purgeTombstones(controller.gcBefore);
}
@@ -113,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
- reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
+ reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -193,8 +192,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
{
// all columns reduced together will have the same name, so there will only be one column
- // in the container; we just want to leverage the conflict resolution code from CF
- ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
+ // in the container; we just want to leverage the conflict resolution code from CF.
+ // (Note that we add the row tombstone in getReduced.)
+ final ColumnFamily container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
// tombstone reference; will be reconciled w/ column during getReduced. Note that the top-level (row) tombstone
// is held by LCR.deletionInfo.
@@ -258,7 +258,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
else
{
// when we clear() the container, it removes the deletion info, so this needs to be reset each time
- container.setDeletionInfo(deletionInfo);
+ container.delete(maxRowTombstone);
ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
if (purged == null || !purged.iterator().hasNext())
{
[6/8] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d25d6d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d25d6d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d25d6d2
Branch: refs/heads/trunk
Commit: 5d25d6d22a17e64347e1d311921ae61b52cb3ae3
Parents: 0bfa210 0d8da2e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:35:48 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:35:48 2013 +0600
----------------------------------------------------------------------
----------------------------------------------------------------------
[8/8] git commit: Merge remote-tracking branch 'origin/trunk' into
trunk
Posted by jb...@apache.org.
Merge remote-tracking branch 'origin/trunk' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b93c25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b93c25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b93c25
Branch: refs/heads/trunk
Commit: a9b93c257304a4bf76f301d483e3264fba934f80
Parents: b171148 d16d5c4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:46:57 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:46:57 2013 +0600
----------------------------------------------------------------------
----------------------------------------------------------------------