You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/03 14:17:03 UTC
cassandra git commit: Make sure RangeTombstone.Tracker only keeps the
ranges it needs to
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 8fcb620d9 -> b0dbea3dd
Make sure RangeTombstone.Tracker only keeps the ranges it needs to
patch by slebresne; reviewed by benedict for CASSANDRA-9486
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0dbea3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0dbea3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0dbea3d
Branch: refs/heads/cassandra-2.0
Commit: b0dbea3dd0bebac97ddc14a847cce54cc38a7177
Parents: 8fcb620
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jun 1 14:52:07 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jun 3 14:14:42 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DeletionTime.java | 5 +
.../org/apache/cassandra/db/RangeTombstone.java | 194 +++++++++++++------
.../db/compaction/LazilyCompactedRow.java | 3 +
4 files changed, 148 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c555a91..16ce060 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.16:
+ * Don't accumulate more range than necessary in RangeTombstone.Tracker (CASSANDRA-9486)
* Add broadcast and rpc addresses to system.local (CASSANDRA-9436)
* Always mark sstable suspect when corrupted (CASSANDRA-9478)
* Add database users and permissions to CQL3 documentation (CASSANDRA-7558)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index dd2ccaf..b39d681 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -114,6 +114,11 @@ public class DeletionTime implements Comparable<DeletionTime>
return column.timestamp() <= markedForDeleteAt;
}
+ public boolean supersedes(DeletionTime dt)
+ {
+ return this.markedForDeleteAt > dt.markedForDeleteAt;
+ }
+
public long memorySize()
{
long fields = TypeSizes.NATIVE.sizeof(markedForDeleteAt) + TypeSizes.NATIVE.sizeof(localDeletionTime);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 16fc27a..fe9da20 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -114,52 +114,73 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0;
}
+ /**
+ * Tracks opened RangeTombstones when iterating over a partition.
+ * <p>
+ * This tracker must be provided all the atoms of a given partition in
+ * order (to the {@code update} method). Given this, it keeps enough
+ * information to be able to decide if one of an atom is deleted (shadowed)
+ * by a previously open RT. One the tracker can prove a given range
+ * tombstone cannot be useful anymore (that is, as soon as we've seen an
+ * atom that is after the end of that RT), it discards this RT. In other
+ * words, the maximum memory used by this object should be proportional to
+ * the maximum number of RT that can be simultaneously open (and this
+ * should fairly low in practice).
+ */
public static class Tracker
{
private final Comparator<ByteBuffer> comparator;
- private final Deque<RangeTombstone> ranges = new ArrayDeque<RangeTombstone>();
- private final SortedSet<RangeTombstone> maxOrderingSet = new TreeSet<RangeTombstone>(new Comparator<RangeTombstone>()
- {
- public int compare(RangeTombstone t1, RangeTombstone t2)
- {
- return comparator.compare(t1.max, t2.max);
- }
- });
- public final Set<RangeTombstone> expired = new HashSet<RangeTombstone>();
+
+ // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a
+ // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because
+ // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we
+ // never have to test the RTs start since it's always assumed to be less than what we have.
+ // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and
+ // will be ignored by writeOpenedMarker.
+ private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>();
+
+ // Total number of atoms written by writeOpenedMarker().
private int atomCount;
+ /**
+ * Creates a new tracker given the table comparator.
+ *
+ * @param comparator the comparator for the table this will track atoms
+ * for. The tracker assumes that atoms will be later provided to the
+ * tracker in {@code comparator} order.
+ */
public Tracker(Comparator<ByteBuffer> comparator)
{
this.comparator = comparator;
}
/**
- * Compute RangeTombstone that are needed at the beginning of an index
+ * Computes the RangeTombstone that are needed at the beginning of an index
* block starting with {@code firstColumn}.
- * Returns the total serialized size of said tombstones and write them
- * to {@code out} it if isn't null.
+ *
+ * @return the total serialized size of said tombstones and write them to
+ * {@code out} it if isn't null.
*/
public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutput out, OnDiskAtom.Serializer atomSerializer) throws IOException
{
long size = 0;
- if (ranges.isEmpty())
+ if (openedTombstones.isEmpty())
return size;
/*
- * Compute the marker that needs to be written at the beginning of
- * this block. We need to write one if it the more recent
+ * Compute the markers that needs to be written at the beginning of
+ * this block. We need to write one if it is the more recent
* (opened) tombstone for at least some part of its range.
*/
List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>();
outer:
- for (RangeTombstone tombstone : ranges)
+ for (RangeTombstone tombstone : openedTombstones)
{
- // If ever the first column is outside the range, skip it (in
- // case update() hasn't been called yet)
+ // If the first column is outside the range, skip it (in case update() hasn't been called yet)
if (comparator.compare(firstColumn.name(), tombstone.max) > 0)
continue;
- if (expired.contains(tombstone))
+ if (tombstone instanceof ExpiredRangeTombstone)
continue;
RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data);
@@ -186,6 +207,9 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
return size;
}
+ /**
+ * The total number of atoms written by calls to the method {@link #writeOpenedMarker}.
+ */
public int writtenAtom()
{
return atomCount;
@@ -193,69 +217,129 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
/**
* Update this tracker given an {@code atom}.
- * If column is a Column, check if any tracked range is useless and
- * can be removed. If it is a RangeTombstone, add it to this tracker.
+ * <p>
+ * This method first test if some range tombstone can be discarded due
+ * to the knowledge of that new atom. Then, if it's a range tombstone,
+ * it adds it to the tracker.
+ * <p>
+ * Note that this method should be called on *every* atom of a partition for
+ * the tracker to work as efficiently as possible (#9486).
*/
public void update(OnDiskAtom atom, boolean isExpired)
{
- if (atom instanceof RangeTombstone)
+ // Get rid of now useless RTs
+ ListIterator<RangeTombstone> iterator = openedTombstones.listIterator();
+ while (iterator.hasNext())
{
- RangeTombstone t = (RangeTombstone)atom;
- // This could be a repeated marker already. If so, we already have a range in which it is
- // fully included. While keeping both would be ok functionaly, we could end up with a lot of
- // useless marker after a few compaction, so avoid this.
- for (RangeTombstone tombstone : maxOrderingSet.tailSet(t))
+ // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future
+ // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too
+ // since maxOrderingSet is sorted by end bounds
+ RangeTombstone t = iterator.next();
+ if (comparator.compare(atom.name(), t.max) > 0)
{
- // We only care about tombstone have the same max than t
- if (comparator.compare(t.max, tombstone.max) > 0)
- break;
-
- // Since it is assume tombstones are passed to this method in growing min order, it's enough to
- // check for the data to know is the current tombstone is included in a previous one
- if (tombstone.data.equals(t.data))
- return;
+ iterator.remove();
+ }
+ else
+ {
+ // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just
+ // returned, so rewind the iterator.
+ iterator.previous();
+ break;
}
- ranges.addLast(t);
- maxOrderingSet.add(t);
- if (isExpired)
- expired.add(t);
}
- else
+
+ // If it's a RT, adds it.
+ if (atom instanceof RangeTombstone)
{
- assert atom instanceof Column;
- Iterator<RangeTombstone> iter = maxOrderingSet.iterator();
- while (iter.hasNext())
+ RangeTombstone toAdd = (RangeTombstone)atom;
+ if (isExpired)
+ toAdd = new ExpiredRangeTombstone(toAdd);
+
+ // We want to maintain openedTombstones in end bounds order so we find where to insert the new element
+ // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed
+ // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will
+ // at least happend for start-of-index-block repeated range tombstones).
+ while (iterator.hasNext())
{
- RangeTombstone tombstone = iter.next();
- if (comparator.compare(atom.name(), tombstone.max) > 0)
+ RangeTombstone existing = iterator.next();
+ int cmp = comparator.compare(toAdd.max, existing.max);
+ if (cmp > 0)
{
- // That tombstone is now useless
- iter.remove();
- ranges.remove(tombstone);
+ // the new one covers more than the existing one. If the new one happens to also supersedes
+ // the existing one, remove the existing one. In any case, we're not done yet.
+ if (toAdd.data.supersedes(existing.data))
+ iterator.remove();
}
else
{
- // Since we're iterating by growing end bound, if the current range
- // includes the column, so does all the next ones
+ // the new one is included in the existing one. If the new one supersedes the existing one,
+ // then we add the new one (and if the new one ends like the existing one, we can actually remove
+ // the existing one), otherwise we can actually ignore it. In any case, we're done.
+ if (toAdd.data.supersedes(existing.data))
+ {
+ if (cmp == 0)
+ iterator.set(toAdd);
+ else
+ insertBefore(toAdd, iterator);
+ }
return;
}
}
+ // If we reach here, either we had no tombstones and the new one ends after all existing ones.
+ iterator.add(toAdd);
}
}
+ /**
+ * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}.
+ * <p>
+ * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that
+ * {@code iterator.hasPrevious() == true}.
+ */
+ private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator)
+ {
+ assert iterator.hasPrevious();
+ iterator.previous();
+ iterator.add(tombstone);
+ iterator.next();
+ }
+
+ /**
+ * Tests if the provided column is deleted by one of the tombstone
+ * tracked by this tracker.
+ * <p>
+ * This method should be called on columns in the same order than for the update()
+ * method. Note that this method does not update the tracker so the update() method
+ * should still be called on {@code column} (it doesn't matter if update is called
+ * before or after this call).
+ */
public boolean isDeleted(Column column)
{
- for (RangeTombstone tombstone : ranges)
+ // We know every tombstone kept are "open", start before the column. So the
+ // column is deleted if any of the tracked tombstone ends after the column
+ // (this will be the case of every RT if update() has been called before this
+ // method, but we might have a few RT to skip otherwise) and the RT deletion is
+ // actually more recent than the column timestamp.
+ for (RangeTombstone tombstone : openedTombstones)
{
- if (comparator.compare(column.name(), tombstone.min) >= 0
- && comparator.compare(column.name(), tombstone.max) <= 0
+ if (comparator.compare(column.name(), tombstone.max) <= 0
&& tombstone.maxTimestamp() >= column.timestamp())
- {
return true;
- }
}
return false;
}
+
+ /**
+ * The tracker needs to track expired range tombstone but keep tracks that they are
+ * expired, so this is what this class is used for.
+ */
+ private static class ExpiredRangeTombstone extends RangeTombstone
+ {
+ private ExpiredRangeTombstone(RangeTombstone tombstone)
+ {
+ super(tombstone.min, tombstone.max, tombstone.data);
+ }
+ }
}
public static class Serializer implements ISSTableSerializer<RangeTombstone>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f61cc2b..43801c6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -304,6 +304,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// not the range tombstones. For that we use the columnIndexer tombstone tracker.
if (indexBuilder.tombstoneTracker().isDeleted(reduced))
{
+ // We skip that column so it won't be passed to the tracker by the index builded. So pass it now to
+ // make sure we still discard potentially un-needed RT as soon as possible.
+ indexBuilder.tombstoneTracker().update(reduced, false);
indexer.remove(reduced);
return null;
}