You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/12 19:46:26 UTC

[1/3] git commit: Fix row tombstones in larger-than-memory compactions patch by thobbs; reviewed by jbellis for CASSANDRA-6008

Updated Branches:
  refs/heads/cassandra-2.0 e6eb5506a -> 0d8da2ee3


Fix row tombstones in larger-than-memory compactions
patch by thobbs; reviewed by jbellis for CASSANDRA-6008


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3edb62bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3edb62bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3edb62bf

Branch: refs/heads/cassandra-2.0
Commit: 3edb62bf773617aeb3a348edc5667a6b0bad0ffe
Parents: e6eb550
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:28:13 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:17:33 2013 +0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/AbstractThreadUnsafeSortedColumns.java   |  6 +-
 .../cassandra/db/AtomicSortedColumns.java       |  4 +-
 .../org/apache/cassandra/db/ColumnFamily.java   | 11 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 21 ++++-
 .../org/apache/cassandra/db/ColumnIndex.java    |  2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   | 76 +++++++++++++-----
 .../org/apache/cassandra/db/DeletionTime.java   | 16 ++++
 .../apache/cassandra/db/RangeTombstoneList.java |  2 +-
 .../db/compaction/LazilyCompactedRow.java       | 54 +++++++------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  2 +-
 .../db/compaction/CompactionsPurgeTest.java     | 84 +++++++++++++++++---
 .../streaming/StreamingTransferTest.java        |  4 +-
 14 files changed, 220 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 30f863e..d573e37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
  * Fix cleanup ClassCastException (CASSANDRA-6462)
  * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
  * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index 1b245eb..36b051b 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -59,7 +59,11 @@ public abstract class AbstractThreadUnsafeSortedColumns extends ColumnFamily
         deletionInfo = newInfo;
     }
 
-    public void maybeResetDeletionTimes(int gcBefore)
+    /**
+     * Purges any tombstones with a local deletion time before gcBefore.
+     * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+     */
+    public void purgeTombstones(int gcBefore)
     {
         deletionInfo.purge(gcBefore);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index f6a6b83..b44d8bf 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -120,12 +120,12 @@ public class AtomicSortedColumns extends ColumnFamily
         ref.set(ref.get().with(newInfo));
     }
 
-    public void maybeResetDeletionTimes(int gcBefore)
+    public void purgeTombstones(int gcBefore)
     {
         while (true)
         {
             Holder current = ref.get();
-            if (!current.deletionInfo.hasIrrelevantData(gcBefore))
+            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
                 break;
 
             DeletionInfo purgedInfo = current.deletionInfo.copy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 47b14b9..2c00071 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -185,7 +185,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public abstract void delete(DeletionTime deletionTime);
     protected abstract void delete(RangeTombstone tombstone);
 
-    public abstract void maybeResetDeletionTimes(int gcBefore);
+    /**
+     * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore.
+     * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+     */
+    public abstract void purgeTombstones(int gcBefore);
 
     /**
      * Adds a column to this column map.
@@ -268,6 +272,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      */
     public abstract boolean isInsertReversed();
 
+    /**
+     * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns.
+     */
     public void delete(ColumnFamily columns)
     {
         delete(columns.deletionInfo());
@@ -459,7 +466,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public boolean hasIrrelevantData(int gcBefore)
     {
         // Do we have gcable deletion infos?
-        if (deletionInfo().hasIrrelevantData(gcBefore))
+        if (deletionInfo().hasPurgeableTombstones(gcBefore))
             return true;
 
         // Do we have colums that are either deleted by the container or gcable tombstone?

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index eb715ac..d585407 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -862,12 +862,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    /**
+     * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left,
+     * null otherwise.
+     * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged
+     */
     public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
     {
-        cf.maybeResetDeletionTimes(gcBefore);
+        // purge old top-level and range tombstones
+        cf.purgeTombstones(gcBefore);
+
+        // if there are no columns or tombstones left, return null
         return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf;
     }
 
+    /**
+     * Removes deleted columns and purges gc-able tombstones.
+     * @return an updated `cf` if any columns or tombstones remain, null otherwise
+     */
     public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
     {
         return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
@@ -890,7 +902,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return removeDeletedCF(cf, gcBefore);
     }
 
-    private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+    /**
+     * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+     * columns that have been dropped from the schema (for CQL3 tables only).
+     * @return the updated ColumnFamily
+     */
+    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         Iterator<Column> iter = cf.iterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 75a06d7..f6b38b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -67,7 +67,7 @@ public class ColumnIndex
         private final RangeTombstone.Tracker tombstoneTracker;
         private int atomCount;
         private final ByteBuffer key;
-        private final DeletionInfo deletionInfo;
+        private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 4e1d68d..13fc824 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -29,15 +29,32 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
 
+/**
+ * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
+ * within a {@link ColumnFamily} (or row).
+ */
 public class DeletionInfo
 {
     private static final Serializer serializer = new Serializer();
 
-    // We don't have way to represent the full interval of keys (Interval don't support the minimum token as the right bound),
-    // so we keep the topLevel deletion info separatly. This also slightly optimize the case of full row deletion which is rather common.
+    /**
+     * This represents a deletion of the entire row.  We can't represent this within the RangeTombstoneList, so it's
+     * kept separately.  This also slightly optimizes the common case of a full row deletion.
+     */
     private DeletionTime topLevel;
-    private RangeTombstoneList ranges; // null if no range tombstones (to save an allocation since it's a common case).
 
+    /**
+     * A list of range tombstones within the row.  This is left as null if there are no range tombstones
+     * (to save an allocation (since it's a common case).
+     */
+    private RangeTombstoneList ranges;
+
+    /**
+     * Creates a DeletionInfo with only a top-level (row) tombstone.
+     * @param markedForDeleteAt the time after which the entire row should be considered deleted
+     * @param localDeletionTime what time the deletion write was applied locally (for purposes of
+     *                          purging the tombstone after gc_grace_seconds).
+     */
     public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
     {
         // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
@@ -61,17 +78,20 @@ public class DeletionInfo
         this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
     }
 
-    public static DeletionInfo live()
-    {
-        return new DeletionInfo(DeletionTime.LIVE);
-    }
-
     private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
     {
         this.topLevel = topLevel;
         this.ranges = ranges;
     }
 
+    /**
+     * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones.
+     */
+    public static DeletionInfo live()
+    {
+        return new DeletionInfo(DeletionTime.LIVE);
+    }
+
     public static Serializer serializer()
     {
         return serializer;
@@ -93,8 +113,7 @@ public class DeletionInfo
     }
 
     /**
-     * Return whether a given column is deleted by the container having this
-     * deletion info.
+     * Return whether a given column is deleted by the container having this deletion info.
      *
      * @param column the column to check.
      * @return true if the column is deleted, false otherwise
@@ -137,8 +156,7 @@ public class DeletionInfo
     /**
      * Purge every tombstones that are older than {@code gcbefore}.
      *
-     * @param gcBefore timestamp (in seconds) before which tombstones should
-     * be purged
+     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
      */
     public void purge(int gcBefore)
     {
@@ -152,14 +170,24 @@ public class DeletionInfo
         }
     }
 
-    public boolean hasIrrelevantData(int gcBefore)
+    /**
+     * Returns true if {@code purge} would remove the top-level tombstone or any of the range
+     * tombstones, false otherwise.
+     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
+     */
+    public boolean hasPurgeableTombstones(int gcBefore)
     {
         if (topLevel.localDeletionTime < gcBefore)
             return true;
 
-        return ranges != null && ranges.hasIrrelevantData(gcBefore);
+        return ranges != null && ranges.hasPurgeableTombstones(gcBefore);
     }
 
+    /**
+     * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt
+     * timestamp.
+     * @param newInfo
+     */
     public void add(DeletionTime newInfo)
     {
         if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
@@ -175,7 +203,9 @@ public class DeletionInfo
     }
 
     /**
-     * Adds the provided deletion infos to the current ones.
+     * Combines another DeletionInfo with this one and returns the result.  Whichever top-level tombstone
+     * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime.  The
+     * range tombstones will be combined.
      *
      * @return this object.
      */
@@ -191,6 +221,9 @@ public class DeletionInfo
         return this;
     }
 
+    /**
+     * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone.
+     */
     public long minTimestamp()
     {
         return ranges == null
@@ -199,7 +232,7 @@ public class DeletionInfo
     }
 
     /**
-     * The maximum timestamp mentioned by this DeletionInfo.
+     * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone.
      */
     public long maxTimestamp()
     {
@@ -208,6 +241,9 @@ public class DeletionInfo
              : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
     }
 
+    /**
+     * Returns the top-level (or "row") tombstone.
+     */
     public DeletionTime getTopLevelDeletion()
     {
         return topLevel;
@@ -326,7 +362,7 @@ public class DeletionInfo
 
     /**
      * This object allow testing whether a given column (name/timestamp) is deleted
-     * or not by this DeletionInfo, assuming that the column given to this
+     * or not by this DeletionInfo, assuming that the columns given to this
      * object are passed in forward or reversed comparator sorted order.
      *
      * This is more efficient that calling DeletionInfo.isDeleted() repeatedly
@@ -336,9 +372,9 @@ public class DeletionInfo
     {
         /*
          * Note that because because range tombstone are added to this DeletionInfo while we iterate,
-         * ranges may be null initially and we need to wait the first range to create the tester (once
-         * created the test will pick up new tombstones however). We do are guaranteed that a range tombstone
-         * will be added *before* we test any column that it may delete so this is ok.
+         * `ranges` may be null initially and we need to wait for the first range to create the tester (once
+         * created the test will pick up new tombstones however). We are guaranteed that a range tombstone
+         * will be added *before* we test any column that it may delete, so this is ok.
          */
         private RangeTombstoneList.InOrderTester tester;
         private final boolean reversed;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 3d6fad4..b80422c 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -27,11 +27,27 @@ import com.google.common.base.Objects;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.utils.ObjectSizes;
 
+/**
+ * A top-level (row) tombstone.
+ */
 public class DeletionTime implements Comparable<DeletionTime>
 {
+    /**
+     * A special DeletionTime that signifies that there is no top-level (row) tombstone.
+     */
     public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
 
+    /**
+     * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
+     * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
+     * for deletion at all.
+     */
     public final long markedForDeleteAt;
+
+    /**
+     * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is
+     * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed.
+     */
     public final int localDeletionTime;
 
     public static final ISerializer<DeletionTime> serializer = new Serializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index fe61916..dad9004 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -305,7 +305,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
     /**
      * Returns whether {@code purge(gcBefore)} would remove something or not.
      */
-    public boolean hasIrrelevantData(int gcBefore)
+    public boolean hasPurgeableTombstones(int gcBefore)
     {
         for (int i = 0; i < size; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0cdbbb7..3b7a3d4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -59,6 +59,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
     private long maxTombstoneTimestamp;
+    private DeletionInfo deletionInfo;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,17 +68,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        ColumnFamily rawCf = null;
+        // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
+        // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+        deletionInfo = DeletionInfo.live();
         maxTombstoneTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
-            ColumnFamily cf = row.getColumnFamily();
-            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
-
-            if (rawCf == null)
-                rawCf = cf;
-            else
-                rawCf.delete(cf);
+            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
+            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
+            deletionInfo = deletionInfo.add(delInfo);
         }
 
         // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
@@ -86,12 +85,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // no other versions of this row present.
         this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
 
-        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
-        // to get rid of redundant row-level and range tombstones
-        assert rawCf != null;
-        int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
-        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
-        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+        emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+        emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+        if (shouldPurge)
+            emptyColumnFamily.purgeTombstones(controller.gcBefore);
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -103,14 +100,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
             columnsIndex = indexBuilder.buildForCompaction(iterator());
-            if (columnsIndex.columnsIndex.isEmpty())
-            {
-                boolean cfIrrelevant = shouldPurge
-                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-                if (cfIrrelevant)
-                    return null;
-            }
+
+            // if there aren't any columns or tombstones, return null
+            if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+                return null;
         }
         catch (IOException e)
         {
@@ -128,7 +121,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         );
         reducer = null;
 
+        // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
         indexBuilder.maybeWriteEmptyRowHeader();
+
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
@@ -201,7 +196,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // in the container; we just want to leverage the conflict resolution code from CF
         ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
 
-        // tombstone reference; will be reconciled w/ column during getReduced
+        // tombstone reference; will be reconciled w/ column during getReduced.  Note that the top-level (row) tombstone
+        // is held by LCR.deletionInfo.
         RangeTombstone tombstone;
 
         int columns = 0;
@@ -212,11 +208,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
         List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
 
+        /**
+         * Called once per version of a cell that we need to merge, after which getReduced() is called.  In other words,
+         * this will be called one or more times with cells that share the same column name.
+         */
         public void reduce(OnDiskAtom current)
         {
             if (current instanceof RangeTombstone)
             {
-                tombstone = (RangeTombstone)current;
+                if (tombstone == null || current.maxTimestamp() >= tombstone.maxTimestamp())
+                    tombstone = (RangeTombstone)current;
             }
             else
             {
@@ -235,6 +236,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
         }
 
+        /**
+         * Called after reduce() has been called for each cell sharing the same name.
+         */
         protected OnDiskAtom getReduced()
         {
             if (tombstone != null)
@@ -253,6 +257,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
             else
             {
+                // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+                container.setDeletionInfo(deletionInfo);
                 ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b4a375a..a71dc48 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -245,12 +245,12 @@ public class Util
             assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
     }
 
-    public static Future<?> compactAll(ColumnFamilyStore cfs)
+    public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore)
     {
-        List<Descriptor> descriptors = new ArrayList<Descriptor>();
+        List<Descriptor> descriptors = new ArrayList<>();
         for (SSTableReader sstable : cfs.getSSTables())
             descriptors.add(sstable.descriptor);
-        return CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE);
+        return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore);
     }
 
     public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index fd685b4..1f41860 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -145,7 +145,7 @@ public class KeyCacheTest extends SchemaLoader
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         // after compaction cache should have entries for
         // new SSTables, if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..8461023 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -21,12 +21,12 @@ package org.apache.cassandra.db.compaction;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
-
-import org.junit.Assert;
+import java.util.concurrent.Future;
 
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.DecoratedKey;
@@ -38,7 +38,11 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.Util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 
@@ -144,7 +148,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         // verify that minor compaction still GC when key is present
         // in a non-compacted sstable but the timestamp ensures we won't miss anything
         cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
-        Assert.assertEquals(1, cf.getColumnCount());
+        assertEquals(1, cf.getColumnCount());
     }
 
     @Test
@@ -181,8 +185,8 @@ public class CompactionsPurgeTest extends SchemaLoader
         // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
         // so it would be invalid to assume we can throw out the c1 entry.
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
-        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
-        Assert.assertEquals(2, cf.getColumnCount());
+        assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+        assertEquals(2, cf.getColumnCount());
     }
 
     @Test
@@ -216,7 +220,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
 
         // compact and test that the row is completely gone
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         assert cfs.getSSTables().isEmpty();
         ColumnFamily cf = keyspace.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assert cf == null : cf;
@@ -253,7 +257,7 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // flush and major compact
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
 
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
@@ -282,6 +286,7 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         DecoratedKey key = Util.dk("key3");
         RowMutation rm;
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
 
         // inserts
         rm = new RowMutation(keyspaceName, key.key);
@@ -295,10 +300,13 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm = new RowMutation(keyspaceName, key.key);
         rm.delete(cfName, 4);
         rm.apply();
+        ColumnFamily cf = cfs.getColumnFamily(filter);
+        assertTrue(cf.isMarkedForDelete());
 
         // flush and major compact (with tombstone purging)
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
+        assertFalse(cfs.getColumnFamily(filter).isMarkedForDelete());
 
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
@@ -308,10 +316,66 @@ public class CompactionsPurgeTest extends SchemaLoader
         }
         rm.apply();
 
-        // Check that the second insert did went in
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
+        // Check that the second insert went in
+        cf = cfs.getColumnFamily(filter);
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)
             assert !c.isMarkedForDelete(System.currentTimeMillis());
     }
+
+    @Test
+    public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+    {
+        String keyspace = "cql_keyspace";
+        String table = "table1";
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+
+        // write a row out to one sstable
+        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                      keyspace, table, 1, "foo", 1));
+        cfs.forceBlockingFlush();
+
+        UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(1, result.size());
+
+        // write a row tombstone out to a second sstable
+        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        cfs.forceBlockingFlush();
+
+        // basic check that the row is considered deleted
+        assertEquals(2, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+
+        // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+        Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+        future.get();
+
+        // the data should be gone, but the tombstone should still exist
+        assertEquals(1, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+
+        // write a row out to one sstable
+        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                      keyspace, table, 1, "foo", 1));
+        cfs.forceBlockingFlush();
+        assertEquals(2, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(1, result.size());
+
+        // write a row tombstone out to a different sstable
+        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        cfs.forceBlockingFlush();
+
+        // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+        future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+        future.get();
+
+        // both the data and the tombstone should be gone this time
+        assertEquals(0, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7787314..7622728 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -136,7 +136,7 @@ public class StreamingTransferTest extends SchemaLoader
         for (int i = 1; i <= 3; i++)
             mutator.mutate("key" + i, "col" + i, timestamp);
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         assertEquals(1, cfs.getSSTables().size());
 
         // transfer the first and last key
@@ -468,7 +468,7 @@ public class StreamingTransferTest extends SchemaLoader
         for (int i = 1; i <= 6000; i++)
             mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
 


[3/3] git commit: fix stats to omit purged row tombstone

Posted by jb...@apache.org.
fix stats to omit purged row tombstone


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d8da2ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d8da2ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d8da2ee

Branch: refs/heads/cassandra-2.0
Commit: 0d8da2ee3c9de7b890b5630c3e0c74b8c80e63dc
Parents: 4e9a7b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:35:41 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:35:41 2013 +0600

----------------------------------------------------------------------
 .../org/apache/cassandra/db/compaction/LazilyCompactedRow.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d8da2ee/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0d33b22..0ad3de2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -112,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
+                                      reducer == null ? emptyColumnFamily.maxTimestamp() : Math.max(emptyColumnFamily.maxTimestamp(), reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,


[2/3] git commit: clarify that we only collect row-level tombstone in LCR constructor

Posted by jb...@apache.org.
clarify that we only collect row-level tombstone in LCR constructor


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e9a7b8c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e9a7b8c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e9a7b8c

Branch: refs/heads/cassandra-2.0
Commit: 4e9a7b8c7fa55df9cda4ac06f77ee9c69b85314d
Parents: 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:43:59 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:26:14 2013 +0600

----------------------------------------------------------------------
 .../db/compaction/LazilyCompactedRow.java       | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e9a7b8c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 3b7a3d4..0d33b22 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -58,8 +58,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxTombstoneTimestamp;
-    private DeletionInfo deletionInfo;
+    private DeletionTime maxRowTombstone;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -70,23 +69,23 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
         // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
         // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
-        deletionInfo = DeletionInfo.live();
-        maxTombstoneTimestamp = Long.MIN_VALUE;
+        maxRowTombstone = DeletionTime.LIVE;
         for (OnDiskAtomIterator row : rows)
         {
-            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
-            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
-            deletionInfo = deletionInfo.add(delInfo);
+            DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
+            if (maxRowTombstone.compareTo(rowTombstone) < 0)
+                maxRowTombstone = rowTombstone;
         }
 
+
         // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
         // tombstones newer than the row-level tombstones we've seen -- but we won't know that
         // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
         // no other versions of this row present.
         this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
 
-        emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-        emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+        emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata);
+        emptyColumnFamily.delete(maxRowTombstone);
         if (shouldPurge)
             emptyColumnFamily.purgeTombstones(controller.gcBefore);
     }
@@ -113,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -193,8 +192,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
         // all columns reduced together will have the same name, so there will only be one column
-        // in the container; we just want to leverage the conflict resolution code from CF
-        ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
+        // in the container; we just want to leverage the conflict resolution code from CF.
+        // (Note that we add the row tombstone in getReduced.)
+        final ColumnFamily container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
 
         // tombstone reference; will be reconciled w/ column during getReduced.  Note that the top-level (row) tombstone
         // is held by LCR.deletionInfo.
@@ -258,7 +258,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             else
             {
                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
-                container.setDeletionInfo(deletionInfo);
+                container.delete(maxRowTombstone);
                 ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {