You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/12 19:47:06 UTC

[1/8] git commit: Fix row tombstones in larger-than-memory compactions patch by thobbs; reviewed by jbellis for CASSANDRA-6008

Updated Branches:
  refs/heads/trunk d16d5c4f2 -> a9b93c257


Fix row tombstones in larger-than-memory compactions
patch by thobbs; reviewed by jbellis for CASSANDRA-6008


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3edb62bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3edb62bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3edb62bf

Branch: refs/heads/trunk
Commit: 3edb62bf773617aeb3a348edc5667a6b0bad0ffe
Parents: e6eb550
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:28:13 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:17:33 2013 +0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/AbstractThreadUnsafeSortedColumns.java   |  6 +-
 .../cassandra/db/AtomicSortedColumns.java       |  4 +-
 .../org/apache/cassandra/db/ColumnFamily.java   | 11 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 21 ++++-
 .../org/apache/cassandra/db/ColumnIndex.java    |  2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   | 76 +++++++++++++-----
 .../org/apache/cassandra/db/DeletionTime.java   | 16 ++++
 .../apache/cassandra/db/RangeTombstoneList.java |  2 +-
 .../db/compaction/LazilyCompactedRow.java       | 54 +++++++------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  2 +-
 .../db/compaction/CompactionsPurgeTest.java     | 84 +++++++++++++++++---
 .../streaming/StreamingTransferTest.java        |  4 +-
 14 files changed, 220 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 30f863e..d573e37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
  * Fix cleanup ClassCastException (CASSANDRA-6462)
  * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
  * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index 1b245eb..36b051b 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -59,7 +59,11 @@ public abstract class AbstractThreadUnsafeSortedColumns extends ColumnFamily
         deletionInfo = newInfo;
     }
 
-    public void maybeResetDeletionTimes(int gcBefore)
+    /**
+     * Purges any tombstones with a local deletion time before gcBefore.
+     * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+     */
+    public void purgeTombstones(int gcBefore)
     {
         deletionInfo.purge(gcBefore);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index f6a6b83..b44d8bf 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -120,12 +120,12 @@ public class AtomicSortedColumns extends ColumnFamily
         ref.set(ref.get().with(newInfo));
     }
 
-    public void maybeResetDeletionTimes(int gcBefore)
+    public void purgeTombstones(int gcBefore)
     {
         while (true)
         {
             Holder current = ref.get();
-            if (!current.deletionInfo.hasIrrelevantData(gcBefore))
+            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
                 break;
 
             DeletionInfo purgedInfo = current.deletionInfo.copy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 47b14b9..2c00071 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -185,7 +185,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public abstract void delete(DeletionTime deletionTime);
     protected abstract void delete(RangeTombstone tombstone);
 
-    public abstract void maybeResetDeletionTimes(int gcBefore);
+    /**
+     * Purges top-level and range tombstones whose localDeletionTime is older than gcBefore.
+     * @param gcBefore a timestamp (in seconds) before which tombstones should be purged
+     */
+    public abstract void purgeTombstones(int gcBefore);
 
     /**
      * Adds a column to this column map.
@@ -268,6 +272,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      */
     public abstract boolean isInsertReversed();
 
+    /**
+     * If `columns` has any tombstones (top-level or range tombstones), they will be applied to this set of columns.
+     */
     public void delete(ColumnFamily columns)
     {
         delete(columns.deletionInfo());
@@ -459,7 +466,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public boolean hasIrrelevantData(int gcBefore)
     {
         // Do we have gcable deletion infos?
-        if (deletionInfo().hasIrrelevantData(gcBefore))
+        if (deletionInfo().hasPurgeableTombstones(gcBefore))
             return true;
 
         // Do we have colums that are either deleted by the container or gcable tombstone?

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index eb715ac..d585407 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -862,12 +862,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    /**
+     * Purges gc-able top-level and range tombstones, returning `cf` if there are any columns or tombstones left,
+     * null otherwise.
+     * @param gcBefore a timestamp (in seconds); tombstones with a localDeletionTime before this will be purged
+     */
     public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
     {
-        cf.maybeResetDeletionTimes(gcBefore);
+        // purge old top-level and range tombstones
+        cf.purgeTombstones(gcBefore);
+
+        // if there are no columns or tombstones left, return null
         return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf;
     }
 
+    /**
+     * Removes deleted columns and purges gc-able tombstones.
+     * @return an updated `cf` if any columns or tombstones remain, null otherwise
+     */
     public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
     {
         return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater);
@@ -890,7 +902,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return removeDeletedCF(cf, gcBefore);
     }
 
-    private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+    /**
+     * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+     * columns that have been dropped from the schema (for CQL3 tables only).
+     * @return the updated ColumnFamily
+     */
+    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
         Iterator<Column> iter = cf.iterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 75a06d7..f6b38b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -67,7 +67,7 @@ public class ColumnIndex
         private final RangeTombstone.Tracker tombstoneTracker;
         private int atomCount;
         private final ByteBuffer key;
-        private final DeletionInfo deletionInfo;
+        private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
 
         public Builder(ColumnFamily cf,
                        ByteBuffer key,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 4e1d68d..13fc824 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -29,15 +29,32 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
 
+/**
+ * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
+ * within a {@link ColumnFamily} (or row).
+ */
 public class DeletionInfo
 {
     private static final Serializer serializer = new Serializer();
 
-    // We don't have way to represent the full interval of keys (Interval don't support the minimum token as the right bound),
-    // so we keep the topLevel deletion info separatly. This also slightly optimize the case of full row deletion which is rather common.
+    /**
+     * This represents a deletion of the entire row.  We can't represent this within the RangeTombstoneList, so it's
+     * kept separately.  This also slightly optimizes the common case of a full row deletion.
+     */
     private DeletionTime topLevel;
-    private RangeTombstoneList ranges; // null if no range tombstones (to save an allocation since it's a common case).
 
+    /**
+     * A list of range tombstones within the row.  This is left as null if there are no range tombstones
+     * (to save an allocation (since it's a common case).
+     */
+    private RangeTombstoneList ranges;
+
+    /**
+     * Creates a DeletionInfo with only a top-level (row) tombstone.
+     * @param markedForDeleteAt the time after which the entire row should be considered deleted
+     * @param localDeletionTime what time the deletion write was applied locally (for purposes of
+     *                          purging the tombstone after gc_grace_seconds).
+     */
     public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
     {
         // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
@@ -61,17 +78,20 @@ public class DeletionInfo
         this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
     }
 
-    public static DeletionInfo live()
-    {
-        return new DeletionInfo(DeletionTime.LIVE);
-    }
-
     private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
     {
         this.topLevel = topLevel;
         this.ranges = ranges;
     }
 
+    /**
+     * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones.
+     */
+    public static DeletionInfo live()
+    {
+        return new DeletionInfo(DeletionTime.LIVE);
+    }
+
     public static Serializer serializer()
     {
         return serializer;
@@ -93,8 +113,7 @@ public class DeletionInfo
     }
 
     /**
-     * Return whether a given column is deleted by the container having this
-     * deletion info.
+     * Return whether a given column is deleted by the container having this deletion info.
      *
      * @param column the column to check.
      * @return true if the column is deleted, false otherwise
@@ -137,8 +156,7 @@ public class DeletionInfo
     /**
      * Purge every tombstones that are older than {@code gcbefore}.
      *
-     * @param gcBefore timestamp (in seconds) before which tombstones should
-     * be purged
+     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
      */
     public void purge(int gcBefore)
     {
@@ -152,14 +170,24 @@ public class DeletionInfo
         }
     }
 
-    public boolean hasIrrelevantData(int gcBefore)
+    /**
+     * Returns true if {@code purge} would remove the top-level tombstone or any of the range
+     * tombstones, false otherwise.
+     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
+     */
+    public boolean hasPurgeableTombstones(int gcBefore)
     {
         if (topLevel.localDeletionTime < gcBefore)
             return true;
 
-        return ranges != null && ranges.hasIrrelevantData(gcBefore);
+        return ranges != null && ranges.hasPurgeableTombstones(gcBefore);
     }
 
+    /**
+     * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt
+     * timestamp.
+     * @param newInfo
+     */
     public void add(DeletionTime newInfo)
     {
         if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
@@ -175,7 +203,9 @@ public class DeletionInfo
     }
 
     /**
-     * Adds the provided deletion infos to the current ones.
+     * Combines another DeletionInfo with this one and returns the result.  Whichever top-level tombstone
+     * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime.  The
+     * range tombstones will be combined.
      *
      * @return this object.
      */
@@ -191,6 +221,9 @@ public class DeletionInfo
         return this;
     }
 
+    /**
+     * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone.
+     */
     public long minTimestamp()
     {
         return ranges == null
@@ -199,7 +232,7 @@ public class DeletionInfo
     }
 
     /**
-     * The maximum timestamp mentioned by this DeletionInfo.
+     * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone.
      */
     public long maxTimestamp()
     {
@@ -208,6 +241,9 @@ public class DeletionInfo
              : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
     }
 
+    /**
+     * Returns the top-level (or "row") tombstone.
+     */
     public DeletionTime getTopLevelDeletion()
     {
         return topLevel;
@@ -326,7 +362,7 @@ public class DeletionInfo
 
     /**
      * This object allow testing whether a given column (name/timestamp) is deleted
-     * or not by this DeletionInfo, assuming that the column given to this
+     * or not by this DeletionInfo, assuming that the columns given to this
      * object are passed in forward or reversed comparator sorted order.
      *
      * This is more efficient that calling DeletionInfo.isDeleted() repeatedly
@@ -336,9 +372,9 @@ public class DeletionInfo
     {
         /*
          * Note that because because range tombstone are added to this DeletionInfo while we iterate,
-         * ranges may be null initially and we need to wait the first range to create the tester (once
-         * created the test will pick up new tombstones however). We do are guaranteed that a range tombstone
-         * will be added *before* we test any column that it may delete so this is ok.
+         * `ranges` may be null initially and we need to wait for the first range to create the tester (once
+         * created the test will pick up new tombstones however). We are guaranteed that a range tombstone
+         * will be added *before* we test any column that it may delete, so this is ok.
          */
         private RangeTombstoneList.InOrderTester tester;
         private final boolean reversed;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 3d6fad4..b80422c 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -27,11 +27,27 @@ import com.google.common.base.Objects;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.utils.ObjectSizes;
 
+/**
+ * A top-level (row) tombstone.
+ */
 public class DeletionTime implements Comparable<DeletionTime>
 {
+    /**
+     * A special DeletionTime that signifies that there is no top-level (row) tombstone.
+     */
     public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
 
+    /**
+     * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
+     * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
+     * for deletion at all.
+     */
     public final long markedForDeleteAt;
+
+    /**
+     * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is
+     * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed.
+     */
     public final int localDeletionTime;
 
     public static final ISerializer<DeletionTime> serializer = new Serializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index fe61916..dad9004 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -305,7 +305,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
     /**
      * Returns whether {@code purge(gcBefore)} would remove something or not.
      */
-    public boolean hasIrrelevantData(int gcBefore)
+    public boolean hasPurgeableTombstones(int gcBefore)
     {
         for (int i = 0; i < size; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0cdbbb7..3b7a3d4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -59,6 +59,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
     private long maxTombstoneTimestamp;
+    private DeletionInfo deletionInfo;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,17 +68,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        ColumnFamily rawCf = null;
+        // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
+        // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+        deletionInfo = DeletionInfo.live();
         maxTombstoneTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
-            ColumnFamily cf = row.getColumnFamily();
-            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
-
-            if (rawCf == null)
-                rawCf = cf;
-            else
-                rawCf.delete(cf);
+            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
+            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
+            deletionInfo = deletionInfo.add(delInfo);
         }
 
         // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
@@ -86,12 +85,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // no other versions of this row present.
         this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
 
-        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
-        // to get rid of redundant row-level and range tombstones
-        assert rawCf != null;
-        int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
-        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
-        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+        emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+        emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+        if (shouldPurge)
+            emptyColumnFamily.purgeTombstones(controller.gcBefore);
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -103,14 +100,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
             columnsIndex = indexBuilder.buildForCompaction(iterator());
-            if (columnsIndex.columnsIndex.isEmpty())
-            {
-                boolean cfIrrelevant = shouldPurge
-                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-                if (cfIrrelevant)
-                    return null;
-            }
+
+            // if there aren't any columns or tombstones, return null
+            if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+                return null;
         }
         catch (IOException e)
         {
@@ -128,7 +121,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         );
         reducer = null;
 
+        // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
         indexBuilder.maybeWriteEmptyRowHeader();
+
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
@@ -201,7 +196,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // in the container; we just want to leverage the conflict resolution code from CF
         ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
 
-        // tombstone reference; will be reconciled w/ column during getReduced
+        // tombstone reference; will be reconciled w/ column during getReduced.  Note that the top-level (row) tombstone
+        // is held by LCR.deletionInfo.
         RangeTombstone tombstone;
 
         int columns = 0;
@@ -212,11 +208,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
         List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
 
+        /**
+         * Called once per version of a cell that we need to merge, after which getReduced() is called.  In other words,
+         * this will be called one or more times with cells that share the same column name.
+         */
         public void reduce(OnDiskAtom current)
         {
             if (current instanceof RangeTombstone)
             {
-                tombstone = (RangeTombstone)current;
+                if (tombstone == null || current.maxTimestamp() >= tombstone.maxTimestamp())
+                    tombstone = (RangeTombstone)current;
             }
             else
             {
@@ -235,6 +236,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
         }
 
+        /**
+         * Called after reduce() has been called for each cell sharing the same name.
+         */
         protected OnDiskAtom getReduced()
         {
             if (tombstone != null)
@@ -253,6 +257,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
             else
             {
+                // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+                container.setDeletionInfo(deletionInfo);
                 ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index b4a375a..a71dc48 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -245,12 +245,12 @@ public class Util
             assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
     }
 
-    public static Future<?> compactAll(ColumnFamilyStore cfs)
+    public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore)
     {
-        List<Descriptor> descriptors = new ArrayList<Descriptor>();
+        List<Descriptor> descriptors = new ArrayList<>();
         for (SSTableReader sstable : cfs.getSSTables())
             descriptors.add(sstable.descriptor);
-        return CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE);
+        return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore);
     }
 
     public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index fd685b4..1f41860 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -145,7 +145,7 @@ public class KeyCacheTest extends SchemaLoader
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         // after compaction cache should have entries for
         // new SSTables, if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..8461023 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -21,12 +21,12 @@ package org.apache.cassandra.db.compaction;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
-
-import org.junit.Assert;
+import java.util.concurrent.Future;
 
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.DecoratedKey;
@@ -38,7 +38,11 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.Util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 
@@ -144,7 +148,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         // verify that minor compaction still GC when key is present
         // in a non-compacted sstable but the timestamp ensures we won't miss anything
         cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
-        Assert.assertEquals(1, cf.getColumnCount());
+        assertEquals(1, cf.getColumnCount());
     }
 
     @Test
@@ -181,8 +185,8 @@ public class CompactionsPurgeTest extends SchemaLoader
         // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
         // so it would be invalid to assume we can throw out the c1 entry.
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
-        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
-        Assert.assertEquals(2, cf.getColumnCount());
+        assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+        assertEquals(2, cf.getColumnCount());
     }
 
     @Test
@@ -216,7 +220,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
 
         // compact and test that the row is completely gone
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         assert cfs.getSSTables().isEmpty();
         ColumnFamily cf = keyspace.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assert cf == null : cf;
@@ -253,7 +257,7 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // flush and major compact
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
 
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
@@ -282,6 +286,7 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         DecoratedKey key = Util.dk("key3");
         RowMutation rm;
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
 
         // inserts
         rm = new RowMutation(keyspaceName, key.key);
@@ -295,10 +300,13 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm = new RowMutation(keyspaceName, key.key);
         rm.delete(cfName, 4);
         rm.apply();
+        ColumnFamily cf = cfs.getColumnFamily(filter);
+        assertTrue(cf.isMarkedForDelete());
 
         // flush and major compact (with tombstone purging)
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
+        assertFalse(cfs.getColumnFamily(filter).isMarkedForDelete());
 
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
@@ -308,10 +316,66 @@ public class CompactionsPurgeTest extends SchemaLoader
         }
         rm.apply();
 
-        // Check that the second insert did went in
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
+        // Check that the second insert went in
+        cf = cfs.getColumnFamily(filter);
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)
             assert !c.isMarkedForDelete(System.currentTimeMillis());
     }
+
+    @Test
+    public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+    {
+        String keyspace = "cql_keyspace";
+        String table = "table1";
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+
+        // write a row out to one sstable
+        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                      keyspace, table, 1, "foo", 1));
+        cfs.forceBlockingFlush();
+
+        UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(1, result.size());
+
+        // write a row tombstone out to a second sstable
+        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        cfs.forceBlockingFlush();
+
+        // basic check that the row is considered deleted
+        assertEquals(2, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+
+        // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+        Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+        future.get();
+
+        // the data should be gone, but the tombstone should still exist
+        assertEquals(1, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+
+        // write a row out to one sstable
+        processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                      keyspace, table, 1, "foo", 1));
+        cfs.forceBlockingFlush();
+        assertEquals(2, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(1, result.size());
+
+        // write a row tombstone out to a different sstable
+        processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        cfs.forceBlockingFlush();
+
+        // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+        future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+        future.get();
+
+        // both the data and the tombstone should be gone this time
+        assertEquals(0, cfs.getSSTables().size());
+        result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+        assertEquals(0, result.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3edb62bf/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7787314..7622728 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -136,7 +136,7 @@ public class StreamingTransferTest extends SchemaLoader
         for (int i = 1; i <= 3; i++)
             mutator.mutate("key" + i, "col" + i, timestamp);
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         assertEquals(1, cfs.getSSTables().size());
 
         // transfer the first and last key
@@ -468,7 +468,7 @@ public class StreamingTransferTest extends SchemaLoader
         for (int i = 1; i <= 6000; i++)
             mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
         cfs.forceBlockingFlush();
-        Util.compactAll(cfs).get();
+        Util.compactAll(cfs, Integer.MAX_VALUE).get();
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
 


[4/8] git commit: merge from 2.0

Posted by jb...@apache.org.
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0bfa210d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0bfa210d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0bfa210d

Branch: refs/heads/trunk
Commit: 0bfa210d071b664b37d6ba5ee4eda280f47d7b0e
Parents: d53c838 4e9a7b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:34:09 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:34:09 2013 +0600

----------------------------------------------------------------------
 .../db/compaction/LazilyCompactedRow.java       | 30 ++++++++++++--------
 1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bfa210d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 8237ff5,0d33b22..23457bc
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,9 -58,7 +56,9 @@@ public class LazilyCompactedRow extend
      private boolean closed;
      private ColumnIndex.Builder indexBuilder;
      private final SecondaryIndexManager.Updater indexer;
 +    private final Reducer reducer;
 +    private final Iterator<OnDiskAtom> merger;
-     private DeletionInfo deletionInfo;
+     private DeletionTime maxRowTombstone;
  
      public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
      {
@@@ -69,36 -69,25 +69,40 @@@
  
          // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
          // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
-         deletionInfo = DeletionInfo.live();
+         maxRowTombstone = DeletionTime.LIVE;
          for (OnDiskAtomIterator row : rows)
-             deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
+         {
+             DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
+             if (maxRowTombstone.compareTo(rowTombstone) < 0)
+                 maxRowTombstone = rowTombstone;
+         }
  
 -
 -        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
 -        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
 -        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
 -        // no other versions of this row present.
 -        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
 +        // tombstones with a localDeletionTime before this can be purged.  This is the minimum timestamp for any sstable
 +        // containing `key` outside of the set of sstables involved in this compaction.
 +        maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
  
-         emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-         emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
-         if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+         emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata);
+         emptyColumnFamily.delete(maxRowTombstone);
 -        if (shouldPurge)
++        if (maxRowTombstone.markedForDeleteAt < maxPurgeableTimestamp)
              emptyColumnFamily.purgeTombstones(controller.gcBefore);
 +
 +        reducer = new Reducer();
 +        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
 +    }
 +
 +    private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
 +    {
 +        // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
 +        // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
 +        // without purging tombstones.
 +        int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
 +        ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
 +
 +        // if we have counters, remove old shards
 +        if (cf.metadata().getDefaultValidator().isCommutative())
 +            CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
 +
 +        return cf;
      }
  
      public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -252,19 -257,18 +257,20 @@@
              }
              else
              {
 +                boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
                  // when we clear() the container, it removes the deletion info, so this needs to be reset each time
-                 container.setDeletionInfo(deletionInfo);
-                 ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
-                 if (purged == null || !purged.iterator().hasNext())
+                 container.delete(maxRowTombstone);
 -                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
 -                if (purged == null || !purged.iterator().hasNext())
++                removeDeletedAndOldShards(key, shouldPurge, controller, container);
++                Iterator<Column> iter = container.iterator();
++                if (!iter.hasNext())
                  {
                      container.clear();
                      return null;
                  }
--                Column reduced = purged.iterator().next();
++                Column reduced = iter.next();
                  container.clear();
  
 -                // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
 +                // removeDeletedAndOldShards have only checked the top-level CF deletion times,
                  // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                  if (indexBuilder.tombstoneTracker().isDeleted(reduced))
                  {


[5/8] git commit: fix stats to omit purged row tombstone

Posted by jb...@apache.org.
fix stats to omit purged row tombstone


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d8da2ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d8da2ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d8da2ee

Branch: refs/heads/trunk
Commit: 0d8da2ee3c9de7b890b5630c3e0c74b8c80e63dc
Parents: 4e9a7b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:35:41 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:35:41 2013 +0600

----------------------------------------------------------------------
 .../org/apache/cassandra/db/compaction/LazilyCompactedRow.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d8da2ee/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 0d33b22..0ad3de2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -112,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
+                                      reducer == null ? emptyColumnFamily.maxTimestamp() : Math.max(emptyColumnFamily.maxTimestamp(), reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,


[7/8] git commit: add back shouldPurge check before counter merging

Posted by jb...@apache.org.
add back shouldPurge check before counter merging


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1711488
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1711488
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1711488

Branch: refs/heads/trunk
Commit: b1711488801781106c90e9143678f94d102e11dd
Parents: 5d25d6d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:44:13 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:44:13 2013 +0600

----------------------------------------------------------------------
 .../apache/cassandra/db/compaction/LazilyCompactedRow.java   | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1711488/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 23457bc..bb00d23 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -90,7 +90,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
     }
 
-    private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+    private static void removeDeletedAndOldShards(ColumnFamily cf, boolean shouldPurge, DecoratedKey key, CompactionController controller)
     {
         // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
         // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
@@ -99,10 +99,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
 
         // if we have counters, remove old shards
-        if (cf.metadata().getDefaultValidator().isCommutative())
+        if (shouldPurge && cf.metadata().getDefaultValidator().isCommutative())
             CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
-
-        return cf;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -260,7 +258,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
                 container.delete(maxRowTombstone);
-                removeDeletedAndOldShards(key, shouldPurge, controller, container);
+                removeDeletedAndOldShards(container, shouldPurge, key, controller);
                 Iterator<Column> iter = container.iterator();
                 if (!iter.hasNext())
                 {


[2/8] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d53c838c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d53c838c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d53c838c

Branch: refs/heads/trunk
Commit: d53c838c9d2f89ac6c88c8306f2302f7fbc6b33d
Parents: 448e4d4 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:17:45 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:18:14 2013 +0600

----------------------------------------------------------------------
 .../db/AbstractThreadUnsafeSortedColumns.java   |  6 +-
 .../cassandra/db/AtomicSortedColumns.java       |  4 +-
 .../org/apache/cassandra/db/ColumnFamily.java   | 11 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 26 ++++++-
 .../org/apache/cassandra/db/ColumnIndex.java    |  2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   | 76 ++++++++++++++-----
 .../org/apache/cassandra/db/DeletionTime.java   | 16 ++++
 .../apache/cassandra/db/RangeTombstoneList.java |  2 +-
 .../db/compaction/CompactionController.java     |  5 +-
 .../db/compaction/LazilyCompactedRow.java       | 77 +++++++++++---------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  2 +-
 .../db/compaction/CompactionsPurgeTest.java     | 77 ++++++++++++++++++--
 .../streaming/StreamingTransferTest.java        |  4 +-
 14 files changed, 236 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 396bbd3,d585407..4e54af0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -885,6 -898,17 +897,16 @@@ public class ColumnFamilyStore implemen
              return null;
          }
  
 -        removeDeletedColumnsOnly(cf, gcBefore, indexer);
 -        return removeDeletedCF(cf, gcBefore);
++        return removeDeletedCF(removeDeletedColumnsOnly(cf, gcBefore, indexer), gcBefore);
+     }
+ 
+     /**
+      * Removes only per-cell tombstones, cells that are shadowed by a row-level or range tombstone, or
+      * columns that have been dropped from the schema (for CQL3 tables only).
+      * @return the updated ColumnFamily
+      */
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
++    public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
+     {
          Iterator<Column> iter = cf.iterator();
          DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
          boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
@@@ -899,10 -924,15 +921,10 @@@
              {
                  iter.remove();
                  indexer.remove(c);
 -                removedBytes += c.dataSize();
              }
          }
 -        return removedBytes;
 -    }
  
-         return removeDeletedCF(cf, gcBefore);
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
 -    {
 -        return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
++        return cf;
      }
  
      // returns true if

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index dc7730c,7edc60e..c4ce2e8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -148,24 -153,25 +148,25 @@@ public class CompactionControlle
      }
  
      /**
 -     * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
 -     * older than @param maxDeletionTimestamp are included in the compaction set
 +     * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
-      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
-      * in other sstables.
++     * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
++     * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
++     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
       */
 -    public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
 +    public long maxPurgeableTimestamp(DecoratedKey key)
      {
          List<SSTableReader> filteredSSTables = overlappingTree.search(key);
 +        long min = Long.MAX_VALUE;
          for (SSTableReader sstable : filteredSSTables)
          {
 -            if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
 -            {
 -                // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
 -                // we check index file instead.
 -                if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 -                    return false;
 -                else if (sstable.getBloomFilter().isPresent(key.key))
 -                    return false;
 -            }
 +            // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
 +            // we check index file instead.
 +            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 +                min = Math.min(min, sstable.getMinTimestamp());
 +            else if (sstable.getBloomFilter().isPresent(key.key))
 +                min = Math.min(min, sstable.getMinTimestamp());
          }
 -        return true;
 +        return min;
      }
  
      public void invalidateCachedRow(DecoratedKey key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 2cb014a,3b7a3d4..8237ff5
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@@ -56,8 -58,8 +56,9 @@@ public class LazilyCompactedRow extend
      private boolean closed;
      private ColumnIndex.Builder indexBuilder;
      private final SecondaryIndexManager.Updater indexer;
 -    private long maxTombstoneTimestamp;
 +    private final Reducer reducer;
 +    private final Iterator<OnDiskAtom> merger;
+     private DeletionInfo deletionInfo;
  
      public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
      {
@@@ -66,39 -68,27 +67,38 @@@
          this.controller = controller;
          indexer = controller.cfs.indexManager.updaterFor(key);
  
-         ColumnFamily rawCf = null;
+         // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
+         // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
+         deletionInfo = DeletionInfo.live();
 -        maxTombstoneTimestamp = Long.MIN_VALUE;
          for (OnDiskAtomIterator row : rows)
--        {
-             ColumnFamily cf = row.getColumnFamily();
 -            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
 -            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
 -            deletionInfo = deletionInfo.add(delInfo);
 -        }
++            deletionInfo = deletionInfo.add(row.getColumnFamily().deletionInfo());
  
-             if (rawCf == null)
-                 rawCf = cf;
-             else
-                 rawCf.delete(cf);
-         }
 -        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
 -        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
 -        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
 -        // no other versions of this row present.
 -        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
++        // tombstones with a localDeletionTime before this can be purged.  This is the minimum timestamp for any sstable
++        // containing `key` outside of the set of sstables involved in this compaction.
 +        maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
-         // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
-         // to get rid of redundant row-level and range tombstones
-         assert rawCf != null;
-         int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
-         ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
-         emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+ 
+         emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
+         emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
 -        if (shouldPurge)
++        if (deletionInfo.maxTimestamp() < maxPurgeableTimestamp)
+             emptyColumnFamily.purgeTombstones(controller.gcBefore);
 +
 +        reducer = new Reducer();
 +        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
 +    }
 +
-     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
++    private static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
 +    {
-         // We should only gc tombstone if shouldPurge == true. But otherwise,
-         // it is still ok to collect column that shadowed by their (deleted)
-         // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
-         ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
-                                                                  shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
-                                                                  controller.cfs.indexManager.updaterFor(key));
-         if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-             CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-         return compacted;
++        // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that
++        // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this
++        // without purging tombstones.
++        int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
++        ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.updaterFor(key));
++
++        // if we have counters, remove old shards
++        if (cf.metadata().getDefaultValidator().isCommutative())
++            CounterColumn.mergeAndRemoveOldShards(key, cf, controller.gcBefore, controller.mergeShardBefore);
++
++        return cf;
      }
  
      public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@@ -109,31 -99,31 +109,29 @@@
          try
          {
              indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
 -            columnsIndex = indexBuilder.buildForCompaction(iterator());
 +            columnsIndex = indexBuilder.buildForCompaction(merger);
-             if (columnsIndex.columnsIndex.isEmpty())
-             {
-                 boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
-                                      ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                      : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
-                 if (cfIrrelevant)
-                     return null;
-             }
+ 
+             // if there aren't any columns or tombstones, return null
+             if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete())
+                 return null;
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
          // reach into the reducer (created during iteration) to get column count, size, max column timestamp
 -        // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
 -        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
 -                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
 -                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
 -                                      reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
 -                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
 -                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
 -                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
 +        columnStats = new ColumnStats(reducer.columns,
 +                                      reducer.minTimestampSeen,
 +                                      Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
 +                                      reducer.maxLocalDeletionTimeSeen,
 +                                      reducer.tombstones,
 +                                      reducer.minColumnNameSeen,
 +                                      reducer.maxColumnNameSeen
          );
 -        reducer = null;
  
+         // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
          indexBuilder.maybeWriteEmptyRowHeader();
+ 
          out.writeShort(SSTableWriter.END_OF_ROW);
  
          close();
@@@ -245,8 -257,9 +252,10 @@@
              }
              else
              {
 +                boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
+                 container.setDeletionInfo(deletionInfo);
 -                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
 +                ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
                  if (purged == null || !purged.iterator().hasNext())
                  {
                      container.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 18e637b,8461023..bd814d0
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@@ -38,8 -38,10 +38,11 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.Util;
  
  import static org.junit.Assert.assertEquals;
 +import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
  import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
 -import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++
+ import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
  
  import org.apache.cassandra.utils.ByteBufferUtil;
  
@@@ -286,8 -283,10 +289,9 @@@ public class CompactionsPurgeTest exten
          String cfName = "Standard1";
          Keyspace keyspace = Keyspace.open(keyspaceName);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 -
          DecoratedKey key = Util.dk("key3");
          RowMutation rm;
+         QueryFilter filter = QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis());
  
          // inserts
          rm = new RowMutation(keyspaceName, key.key);
@@@ -316,4 -322,60 +323,60 @@@
          for (Column c : cf)
              assert !c.isMarkedForDelete(System.currentTimeMillis());
      }
- }
+ 
+     @Test
+     public void testRowTombstoneObservedBeforePurging() throws InterruptedException, ExecutionException, IOException
+     {
+         String keyspace = "cql_keyspace";
+         String table = "table1";
+         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+         cfs.disableAutoCompaction();
+ 
+         // write a row out to one sstable
+         processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                       keyspace, table, 1, "foo", 1));
+         cfs.forceBlockingFlush();
+ 
+         UntypedResultSet result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(1, result.size());
+ 
+         // write a row tombstone out to a second sstable
+         processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         cfs.forceBlockingFlush();
+ 
+         // basic check that the row is considered deleted
+         assertEquals(2, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+ 
+         // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
+         Future future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
+         future.get();
+ 
+         // the data should be gone, but the tombstone should still exist
+         assertEquals(1, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+ 
+         // write a row out to one sstable
+         processInternal(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, '%s', %d)",
+                                       keyspace, table, 1, "foo", 1));
+         cfs.forceBlockingFlush();
+         assertEquals(2, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(1, result.size());
+ 
+         // write a row tombstone out to a different sstable
+         processInternal(String.format("DELETE FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         cfs.forceBlockingFlush();
+ 
+         // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
+         future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
+         future.get();
+ 
+         // both the data and the tombstone should be gone this time
+         assertEquals(0, cfs.getSSTables().size());
+         result = processInternal(String.format("SELECT * FROM %s.%s WHERE k = %d", keyspace, table, 1));
+         assertEquals(0, result.size());
+     }
 -}
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d53c838c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------


[3/8] git commit: clarify that we only collect row-level tombstone in LCR constructor

Posted by jb...@apache.org.
clarify that we only collect row-level tombstone in LCR constructor


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e9a7b8c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e9a7b8c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e9a7b8c

Branch: refs/heads/trunk
Commit: 4e9a7b8c7fa55df9cda4ac06f77ee9c69b85314d
Parents: 3edb62b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 12 23:43:59 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:26:14 2013 +0600

----------------------------------------------------------------------
 .../db/compaction/LazilyCompactedRow.java       | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e9a7b8c/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 3b7a3d4..0d33b22 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -58,8 +58,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxTombstoneTimestamp;
-    private DeletionInfo deletionInfo;
+    private DeletionTime maxRowTombstone;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -70,23 +69,23 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
         // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp.  This may be
         // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge
-        deletionInfo = DeletionInfo.live();
-        maxTombstoneTimestamp = Long.MIN_VALUE;
+        maxRowTombstone = DeletionTime.LIVE;
         for (OnDiskAtomIterator row : rows)
         {
-            DeletionInfo delInfo = row.getColumnFamily().deletionInfo();
-            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, delInfo.maxTimestamp());
-            deletionInfo = deletionInfo.add(delInfo);
+            DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion();
+            if (maxRowTombstone.compareTo(rowTombstone) < 0)
+                maxRowTombstone = rowTombstone;
         }
 
+
         // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
         // tombstones newer than the row-level tombstones we've seen -- but we won't know that
         // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
         // no other versions of this row present.
         this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
 
-        emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-        emptyColumnFamily.setDeletionInfo(deletionInfo.copy());
+        emptyColumnFamily = EmptyColumns.factory.create(controller.cfs.metadata);
+        emptyColumnFamily.delete(maxRowTombstone);
         if (shouldPurge)
             emptyColumnFamily.purgeTombstones(controller.gcBefore);
     }
@@ -113,7 +112,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? maxRowTombstone.markedForDeleteAt : Math.max(maxRowTombstone.markedForDeleteAt, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -193,8 +192,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
     {
         // all columns reduced together will have the same name, so there will only be one column
-        // in the container; we just want to leverage the conflict resolution code from CF
-        ColumnFamily container = emptyColumnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
+        // in the container; we just want to leverage the conflict resolution code from CF.
+        // (Note that we add the row tombstone in getReduced.)
+        final ColumnFamily container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata());
 
         // tombstone reference; will be reconciled w/ column during getReduced.  Note that the top-level (row) tombstone
         // is held by LCR.deletionInfo.
@@ -258,7 +258,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             else
             {
                 // when we clear() the container, it removes the deletion info, so this needs to be reset each time
-                container.setDeletionInfo(deletionInfo);
+                container.delete(maxRowTombstone);
                 ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {


[6/8] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d25d6d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d25d6d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d25d6d2

Branch: refs/heads/trunk
Commit: 5d25d6d22a17e64347e1d311921ae61b52cb3ae3
Parents: 0bfa210 0d8da2e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:35:48 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:35:48 2013 +0600

----------------------------------------------------------------------

----------------------------------------------------------------------



[8/8] git commit: Merge remote-tracking branch 'origin/trunk' into trunk

Posted by jb...@apache.org.
Merge remote-tracking branch 'origin/trunk' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b93c25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b93c25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b93c25

Branch: refs/heads/trunk
Commit: a9b93c257304a4bf76f301d483e3264fba934f80
Parents: b171148 d16d5c4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 13 00:46:57 2013 +0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 13 00:46:57 2013 +0600

----------------------------------------------------------------------

----------------------------------------------------------------------