You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/03 14:17:03 UTC

cassandra git commit: Make sure RangeTombstone.Tracker only keeps the ranges it needs to

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 8fcb620d9 -> b0dbea3dd


Make sure RangeTombstone.Tracker only keeps the ranges it needs to

patch by slebresne; reviewed by benedict for CASSANDRA-9486


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0dbea3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0dbea3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0dbea3d

Branch: refs/heads/cassandra-2.0
Commit: b0dbea3dd0bebac97ddc14a847cce54cc38a7177
Parents: 8fcb620
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jun 1 14:52:07 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jun 3 14:14:42 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DeletionTime.java   |   5 +
 .../org/apache/cassandra/db/RangeTombstone.java | 194 +++++++++++++------
 .../db/compaction/LazilyCompactedRow.java       |   3 +
 4 files changed, 148 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c555a91..16ce060 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.16:
+ * Don't accumulate more range than necessary in RangeTombstone.Tracker (CASSANDRA-9486)
  * Add broadcast and rpc addresses to system.local (CASSANDRA-9436)
  * Always mark sstable suspect when corrupted (CASSANDRA-9478)
  * Add database users and permissions to CQL3 documentation (CASSANDRA-7558)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index dd2ccaf..b39d681 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -114,6 +114,11 @@ public class DeletionTime implements Comparable<DeletionTime>
         return column.timestamp() <= markedForDeleteAt;
     }
 
+    public boolean supersedes(DeletionTime dt)
+    {
+        return this.markedForDeleteAt > dt.markedForDeleteAt;
+    }
+
     public long memorySize()
     {
         long fields = TypeSizes.NATIVE.sizeof(markedForDeleteAt) + TypeSizes.NATIVE.sizeof(localDeletionTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 16fc27a..fe9da20 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -114,52 +114,73 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
         return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0;
     }
 
+    /**
+     * Tracks opened RangeTombstones when iterating over a partition.
+     * <p>
+     * This tracker must be provided all the atoms of a given partition in
+     * order (to the {@code update} method). Given this, it keeps enough
+     * information to be able to decide if one of an atom is deleted (shadowed)
+     * by a previously open RT. One the tracker can prove a given range
+     * tombstone cannot be useful anymore (that is, as soon as we've seen an
+     * atom that is after the end of that RT), it discards this RT. In other
+     * words, the maximum memory used by this object should be proportional to
+     * the maximum number of RT that can be simultaneously open (and this
+     * should fairly low in practice).
+     */
     public static class Tracker
     {
         private final Comparator<ByteBuffer> comparator;
-        private final Deque<RangeTombstone> ranges = new ArrayDeque<RangeTombstone>();
-        private final SortedSet<RangeTombstone> maxOrderingSet = new TreeSet<RangeTombstone>(new Comparator<RangeTombstone>()
-        {
-            public int compare(RangeTombstone t1, RangeTombstone t2)
-            {
-                return comparator.compare(t1.max, t2.max);
-            }
-        });
-        public final Set<RangeTombstone> expired = new HashSet<RangeTombstone>();
+
+        // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a
+        // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because
+        // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we
+        // never have to test the RTs start since it's always assumed to be less than what we have.
+        // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and
+        // will be ignored by writeOpenedMarker.
+        private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>();
+
+        // Total number of atoms written by writeOpenedMarker().
         private int atomCount;
 
+        /**
+         * Creates a new tracker given the table comparator.
+         *
+         * @param comparator the comparator for the table this will track atoms
+         * for. The tracker assumes that atoms will be later provided to the
+         * tracker in {@code comparator} order.
+         */
         public Tracker(Comparator<ByteBuffer> comparator)
         {
             this.comparator = comparator;
         }
 
         /**
-         * Compute RangeTombstone that are needed at the beginning of an index
+         * Computes the RangeTombstone that are needed at the beginning of an index
          * block starting with {@code firstColumn}.
-         * Returns the total serialized size of said tombstones and write them
-         * to {@code out} it if isn't null.
+         *
+         * @return the total serialized size of said tombstones and write them to
+         * {@code out} it if isn't null.
          */
         public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutput out, OnDiskAtom.Serializer atomSerializer) throws IOException
         {
             long size = 0;
-            if (ranges.isEmpty())
+            if (openedTombstones.isEmpty())
                 return size;
 
             /*
-             * Compute the marker that needs to be written at the beginning of
-             * this block. We need to write one if it the more recent
+             * Compute the markers that needs to be written at the beginning of
+             * this block. We need to write one if it is the more recent
              * (opened) tombstone for at least some part of its range.
              */
             List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>();
             outer:
-            for (RangeTombstone tombstone : ranges)
+            for (RangeTombstone tombstone : openedTombstones)
             {
-                // If ever the first column is outside the range, skip it (in
-                // case update() hasn't been called yet)
+                // If the first column is outside the range, skip it (in case update() hasn't been called yet)
                 if (comparator.compare(firstColumn.name(), tombstone.max) > 0)
                     continue;
 
-                if (expired.contains(tombstone))
+                if (tombstone instanceof ExpiredRangeTombstone)
                     continue;
 
                 RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data);
@@ -186,6 +207,9 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
             return size;
         }
 
+        /**
+         * The total number of atoms written by calls to the method {@link #writeOpenedMarker}.
+         */
         public int writtenAtom()
         {
             return atomCount;
@@ -193,69 +217,129 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
         /**
          * Update this tracker given an {@code atom}.
-         * If column is a Column, check if any tracked range is useless and
-         * can be removed. If it is a RangeTombstone, add it to this tracker.
+         * <p>
+         * This method first test if some range tombstone can be discarded due
+         * to the knowledge of that new atom. Then, if it's a range tombstone,
+         * it adds it to the tracker.
+         * <p>
+         * Note that this method should be called on *every* atom of a partition for
+         * the tracker to work as efficiently as possible (#9486).
          */
         public void update(OnDiskAtom atom, boolean isExpired)
         {
-            if (atom instanceof RangeTombstone)
+            // Get rid of now useless RTs
+            ListIterator<RangeTombstone> iterator = openedTombstones.listIterator();
+            while (iterator.hasNext())
             {
-                RangeTombstone t = (RangeTombstone)atom;
-                // This could be a repeated marker already. If so, we already have a range in which it is
-                // fully included. While keeping both would be ok functionaly, we could end up with a lot of
-                // useless marker after a few compaction, so avoid this.
-                for (RangeTombstone tombstone : maxOrderingSet.tailSet(t))
+                // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future
+                // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too
+                // since maxOrderingSet is sorted by end bounds
+                RangeTombstone t = iterator.next();
+                if (comparator.compare(atom.name(), t.max) > 0)
                 {
-                    // We only care about tombstone have the same max than t
-                    if (comparator.compare(t.max, tombstone.max) > 0)
-                        break;
-
-                    // Since it is assume tombstones are passed to this method in growing min order, it's enough to
-                    // check for the data to know is the current tombstone is included in a previous one
-                    if (tombstone.data.equals(t.data))
-                        return;
+                    iterator.remove();
+                }
+                else
+                {
+                    // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just
+                    // returned, so rewind the iterator.
+                    iterator.previous();
+                    break;
                 }
-                ranges.addLast(t);
-                maxOrderingSet.add(t);
-                if (isExpired)
-                    expired.add(t);
             }
-            else
+
+            // If it's a RT, adds it.
+            if (atom instanceof RangeTombstone)
             {
-                assert atom instanceof Column;
-                Iterator<RangeTombstone> iter = maxOrderingSet.iterator();
-                while (iter.hasNext())
+                RangeTombstone toAdd = (RangeTombstone)atom;
+                if (isExpired)
+                    toAdd = new ExpiredRangeTombstone(toAdd);
+
+                // We want to maintain openedTombstones in end bounds order so we find where to insert the new element
+                // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed
+                // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will
+                // at least happend for start-of-index-block repeated range tombstones).
+                while (iterator.hasNext())
                 {
-                    RangeTombstone tombstone = iter.next();
-                    if (comparator.compare(atom.name(), tombstone.max) > 0)
+                    RangeTombstone existing = iterator.next();
+                    int cmp = comparator.compare(toAdd.max, existing.max);
+                    if (cmp > 0)
                     {
-                        // That tombstone is now useless
-                        iter.remove();
-                        ranges.remove(tombstone);
+                        // the new one covers more than the existing one. If the new one happens to also supersedes
+                        // the existing one, remove the existing one. In any case, we're not done yet.
+                        if (toAdd.data.supersedes(existing.data))
+                            iterator.remove();
                     }
                     else
                     {
-                        // Since we're iterating by growing end bound, if the current range
-                        // includes the column, so does all the next ones
+                        // the new one is included in the existing one. If the new one supersedes the existing one,
+                        // then we add the new one (and if the new one ends like the existing one, we can actually remove
+                        // the existing one), otherwise we can actually ignore it. In any case, we're done.
+                        if (toAdd.data.supersedes(existing.data))
+                        {
+                            if (cmp == 0)
+                                iterator.set(toAdd);
+                            else
+                                insertBefore(toAdd, iterator);
+                        }
                         return;
                     }
                 }
+                // If we reach here, either we had no tombstones and the new one ends after all existing ones.
+                iterator.add(toAdd);
             }
         }
 
+        /**
+         * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}.
+         * <p>
+         * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that
+         * {@code iterator.hasPrevious() == true}.
+         */
+        private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator)
+        {
+            assert iterator.hasPrevious();
+            iterator.previous();
+            iterator.add(tombstone);
+            iterator.next();
+        }
+
+        /**
+         * Tests if the provided column is deleted by one of the tombstone
+         * tracked by this tracker.
+         * <p>
+         * This method should be called on columns in the same order than for the update()
+         * method. Note that this method does not update the tracker so the update() method
+         * should still be called on {@code column} (it doesn't matter if update is called
+         * before or after this call).
+         */
         public boolean isDeleted(Column column)
         {
-            for (RangeTombstone tombstone : ranges)
+            // We know every tombstone kept are "open", start before the column. So the
+            // column is deleted if any of the tracked tombstone ends after the column
+            // (this will be the case of every RT if update() has been called before this
+            // method, but we might have a few RT to skip otherwise) and the RT deletion is
+            // actually more recent than the column timestamp.
+            for (RangeTombstone tombstone : openedTombstones)
             {
-                if (comparator.compare(column.name(), tombstone.min) >= 0
-                    && comparator.compare(column.name(), tombstone.max) <= 0
+                if (comparator.compare(column.name(), tombstone.max) <= 0
                     && tombstone.maxTimestamp() >= column.timestamp())
-                {
                     return true;
-                }
             }
             return false;
         }
+
+        /**
+         * The tracker needs to track expired range tombstone but keep tracks that they are
+         * expired, so this is what this class is used for.
+         */
+        private static class ExpiredRangeTombstone extends RangeTombstone
+        {
+            private ExpiredRangeTombstone(RangeTombstone tombstone)
+            {
+                super(tombstone.min, tombstone.max, tombstone.data);
+            }
+        }
     }
 
     public static class Serializer implements ISSTableSerializer<RangeTombstone>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index f61cc2b..43801c6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -304,6 +304,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 // not the range tombstones. For that we use the columnIndexer tombstone tracker.
                 if (indexBuilder.tombstoneTracker().isDeleted(reduced))
                 {
+                    // We skip that column so it won't be passed to the tracker by the index builded. So pass it now to
+                    // make sure we still discard potentially un-needed RT as soon as possible.
+                    indexBuilder.tombstoneTracker().update(reduced, false);
                     indexer.remove(reduced);
                     return null;
                 }