You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/25 10:13:16 UTC

[01/10] cassandra git commit: maxPurgeableTimestamp needs to check memtables too

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 b4d67c9e5 -> b5d6d4f72
  refs/heads/cassandra-3.0 2f8e5f346 -> 442f4737c
  refs/heads/cassandra-3.3 acb7fed1d -> f2174280d
  refs/heads/trunk 72790dc8e -> 4abd99363


maxPurgeableTimestamp needs to check memtables too

Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7

Branch: refs/heads/cassandra-2.2
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  14 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  15 +-
 .../db/compaction/CompactionController.java     |  21 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
 6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
  * Apply change to compaction throughput in real time (CASSANDRA-10025)
  * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
  * Avoid over-fetching during the page of range queries (CASSANDRA-8521)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+    public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 {
                     indexer.updateRowLevelIndexes();
                     updater.finish();
-                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+                    return updater;
                 }
                 else if (!monitorOwned)
                 {
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     }
 
     // the function we provide to the btree utilities to perform any column replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
+    static final class ColumnUpdater implements UpdateFunction<Cell>
     {
         final AtomicBTreeColumns updating;
         final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         long colUpdateTimeDelta = Long.MAX_VALUE;
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Cell> inserted; // TODO: replace with walk of aborted BTree
+        long minTimestamp = Long.MAX_VALUE;
 
         private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(insert);
+            minTimestamp = Math.min(minTimestamp, insert.timestamp());
             return insert;
         }
 
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             Cell reconciled = existing.reconcile(update);
             indexer.update(existing, reconciled);
+            // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+            // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+            // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+            // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+            minTimestamp = Math.min(minTimestamp, update.timestamp());
             if (existing != reconciled)
             {
                 reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 inserted.clear();
             }
             reclaimer.cancel();
+            minTimestamp = Long.MAX_VALUE;
         }
 
         protected void abort(Cell abort)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
+    // The smallest timestamp for all partitions stored in this memtable
+    private long minTimestamp = Long.MAX_VALUE;
+
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
+        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+        liveDataSize.addAndGet(initialSize + updater.dataSize);
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
+        return updater.colUpdateTimeDelta;
     }
 
     // for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
+    public long getMinTimestamp()
+    {
+        return minTimestamp;
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
      * Finds expired sstables
      *
      * works something like this;
-     * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+     * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
      * 2. build a list of fully expired candidates
      * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
      *    - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
                 minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
         }
 
+        for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+            minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
         // At this point, minTimestamp denotes the lowest timestamp of any relevant
-        // SSTable that contains a constructive value. candidates contains all the
+        // SSTable or Memtable that contains a constructive value. candidates contains all the
         // candidates with no constructive values. The ones out of these that have
         // (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
 
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
      * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
      * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
-     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+     * participating in this compaction, or memtable that contains this partition,
+     * or LONG.MAX_VALUE if no SSTable or memtable exist.
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
             else if (sstable.getBloomFilter().isPresent(key))
                 min = Math.min(min, sstable.getMinTimestamp());
         }
+
+        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        {
+            ColumnFamily cf = memtable.getColumnFamily(key);
+            if (cf != null)
+                min = Math.min(min, memtable.getMinTimestamp());
+        }
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "CompactionControllerTest";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
+    }
+
+    @Test
+    public void testMaxPurgeableTimestamp()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // add to first memtable
+        applyMutation(CF1, rowKey, timestamp1);
+
+        // check max purgeable timestamp without any sstables
+        try(CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+            cfs.forceBlockingFlush();
+            assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+        }
+
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+        // create another sstable
+        applyMutation(CF1, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // check max purgeable timestamp when compacting the first sstable with and without a memtable
+        try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+        }
+
+        // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+        cfs.forceBlockingFlush();
+
+        //newest to oldest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp1);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+
+        cfs.forceBlockingFlush();
+
+        //oldest to newest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp3);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp1);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+    }
+
+    @Test
+    public void testGetFullyExpiredSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // create sstable with tombstone that should be expired in no older timestamps
+        applyDeleteMutation(CF2, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // first sstable with tombstone is compacting
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+        // create another sstable with more recent timestamp
+        applyMutation(CF2, rowKey, timestamp1);
+        cfs.forceBlockingFlush();
+
+        // second sstable is overlapping
+        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+        // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
+        assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+        // however if we add an older mutation to the memtable then the sstable should not be expired
+        applyMutation(CF2, rowKey, timestamp3);
+        expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(0, expired.size());
+    }
+
+    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        CellName colName = cellname("birthdate");
+        ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.add(cf, colName, val, timestamp);
+        rm.applyUnsafe();
+    }
+
+    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.delete(cf, timestamp);
+        rm.applyUnsafe();
+    }
+
+
+
+}


[03/10] cassandra git commit: maxPurgeableTimestamp needs to check memtables too

Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too

Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7

Branch: refs/heads/cassandra-3.3
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  14 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  15 +-
 .../db/compaction/CompactionController.java     |  21 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
 6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
  * Apply change to compaction throughput in real time (CASSANDRA-10025)
  * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
  * Avoid over-fetching during the page of range queries (CASSANDRA-8521)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+    public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 {
                     indexer.updateRowLevelIndexes();
                     updater.finish();
-                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+                    return updater;
                 }
                 else if (!monitorOwned)
                 {
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     }
 
     // the function we provide to the btree utilities to perform any column replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
+    static final class ColumnUpdater implements UpdateFunction<Cell>
     {
         final AtomicBTreeColumns updating;
         final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         long colUpdateTimeDelta = Long.MAX_VALUE;
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Cell> inserted; // TODO: replace with walk of aborted BTree
+        long minTimestamp = Long.MAX_VALUE;
 
         private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(insert);
+            minTimestamp = Math.min(minTimestamp, insert.timestamp());
             return insert;
         }
 
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             Cell reconciled = existing.reconcile(update);
             indexer.update(existing, reconciled);
+            // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+            // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+            // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+            // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+            minTimestamp = Math.min(minTimestamp, update.timestamp());
             if (existing != reconciled)
             {
                 reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 inserted.clear();
             }
             reclaimer.cancel();
+            minTimestamp = Long.MAX_VALUE;
         }
 
         protected void abort(Cell abort)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
+    // The smallest timestamp for all partitions stored in this memtable
+    private long minTimestamp = Long.MAX_VALUE;
+
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
+        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+        liveDataSize.addAndGet(initialSize + updater.dataSize);
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
+        return updater.colUpdateTimeDelta;
     }
 
     // for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
+    public long getMinTimestamp()
+    {
+        return minTimestamp;
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
      * Finds expired sstables
      *
      * works something like this;
-     * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+     * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
      * 2. build a list of fully expired candidates
      * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
      *    - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
                 minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
         }
 
+        for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+            minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
         // At this point, minTimestamp denotes the lowest timestamp of any relevant
-        // SSTable that contains a constructive value. candidates contains all the
+        // SSTable or Memtable that contains a constructive value. candidates contains all the
         // candidates with no constructive values. The ones out of these that have
         // (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
 
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
      * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
      * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
-     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+     * participating in this compaction, or memtable that contains this partition,
+     * or LONG.MAX_VALUE if no SSTable or memtable exist.
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
             else if (sstable.getBloomFilter().isPresent(key))
                 min = Math.min(min, sstable.getMinTimestamp());
         }
+
+        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        {
+            ColumnFamily cf = memtable.getColumnFamily(key);
+            if (cf != null)
+                min = Math.min(min, memtable.getMinTimestamp());
+        }
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "CompactionControllerTest";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
+    }
+
+    @Test
+    public void testMaxPurgeableTimestamp()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // add to first memtable
+        applyMutation(CF1, rowKey, timestamp1);
+
+        // check max purgeable timestamp without any sstables
+        try(CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+            cfs.forceBlockingFlush();
+            assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+        }
+
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+        // create another sstable
+        applyMutation(CF1, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // check max purgeable timestamp when compacting the first sstable with and without a memtable
+        try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+        }
+
+        // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+        cfs.forceBlockingFlush();
+
+        //newest to oldest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp1);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+
+        cfs.forceBlockingFlush();
+
+        //oldest to newest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp3);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp1);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+    }
+
+    @Test
+    public void testGetFullyExpiredSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // create sstable with tombstone that should be expired in no older timestamps
+        applyDeleteMutation(CF2, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // first sstable with tombstone is compacting
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+        // create another sstable with more recent timestamp
+        applyMutation(CF2, rowKey, timestamp1);
+        cfs.forceBlockingFlush();
+
+        // second sstable is overlapping
+        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+        // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
+        assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+        // however if we add an older mutation to the memtable then the sstable should not be expired
+        applyMutation(CF2, rowKey, timestamp3);
+        expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(0, expired.size());
+    }
+
+    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        CellName colName = cellname("birthdate");
+        ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.add(cf, colName, val, timestamp);
+        rm.applyUnsafe();
+    }
+
+    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.delete(cf, timestamp);
+        rm.applyUnsafe();
+    }
+
+
+
+}


[10/10] cassandra git commit: Merge branch 'cassandra-3.3' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-3.3' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4abd9936
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4abd9936
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4abd9936

Branch: refs/heads/trunk
Commit: 4abd9936342f5faf3e23c3d395e24278a3b3bfbd
Parents: 72790dc f217428
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:31 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4abd9936/CHANGES.txt
----------------------------------------------------------------------


[04/10] cassandra git commit: maxPurgeableTimestamp needs to check memtables too

Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too

Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7

Branch: refs/heads/trunk
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  14 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  15 +-
 .../db/compaction/CompactionController.java     |  21 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
 6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
  * Apply change to compaction throughput in real time (CASSANDRA-10025)
  * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
  * Avoid over-fetching during the page of range queries (CASSANDRA-8521)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+    public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 {
                     indexer.updateRowLevelIndexes();
                     updater.finish();
-                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+                    return updater;
                 }
                 else if (!monitorOwned)
                 {
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     }
 
     // the function we provide to the btree utilities to perform any column replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
+    static final class ColumnUpdater implements UpdateFunction<Cell>
     {
         final AtomicBTreeColumns updating;
         final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         long colUpdateTimeDelta = Long.MAX_VALUE;
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Cell> inserted; // TODO: replace with walk of aborted BTree
+        long minTimestamp = Long.MAX_VALUE;
 
         private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(insert);
+            minTimestamp = Math.min(minTimestamp, insert.timestamp());
             return insert;
         }
 
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             Cell reconciled = existing.reconcile(update);
             indexer.update(existing, reconciled);
+            // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+            // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+            // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+            // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+            minTimestamp = Math.min(minTimestamp, update.timestamp());
             if (existing != reconciled)
             {
                 reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 inserted.clear();
             }
             reclaimer.cancel();
+            minTimestamp = Long.MAX_VALUE;
         }
 
         protected void abort(Cell abort)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
+    // The smallest timestamp for all partitions stored in this memtable
+    private long minTimestamp = Long.MAX_VALUE;
+
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
+        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+        liveDataSize.addAndGet(initialSize + updater.dataSize);
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
+        return updater.colUpdateTimeDelta;
     }
 
     // for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
+    public long getMinTimestamp()
+    {
+        return minTimestamp;
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
      * Finds expired sstables
      *
      * works something like this;
-     * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+     * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
      * 2. build a list of fully expired candidates
      * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
      *    - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
                 minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
         }
 
+        for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+            minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
         // At this point, minTimestamp denotes the lowest timestamp of any relevant
-        // SSTable that contains a constructive value. candidates contains all the
+        // SSTable or Memtable that contains a constructive value. candidates contains all the
         // candidates with no constructive values. The ones out of these that have
         // (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
 
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
      * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
      * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
-     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+     * participating in this compaction, or memtable that contains this partition,
+     * or LONG.MAX_VALUE if no SSTable or memtable exist.
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
             else if (sstable.getBloomFilter().isPresent(key))
                 min = Math.min(min, sstable.getMinTimestamp());
         }
+
+        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        {
+            ColumnFamily cf = memtable.getColumnFamily(key);
+            if (cf != null)
+                min = Math.min(min, memtable.getMinTimestamp());
+        }
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "CompactionControllerTest";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
+    }
+
+    @Test
+    public void testMaxPurgeableTimestamp()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // add to first memtable
+        applyMutation(CF1, rowKey, timestamp1);
+
+        // check max purgeable timestamp without any sstables
+        try(CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+            cfs.forceBlockingFlush();
+            assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+        }
+
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+        // create another sstable
+        applyMutation(CF1, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // check max purgeable timestamp when compacting the first sstable with and without a memtable
+        try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+        }
+
+        // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+        cfs.forceBlockingFlush();
+
+        //newest to oldest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp1);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+
+        cfs.forceBlockingFlush();
+
+        //oldest to newest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp3);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp1);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+    }
+
+    @Test
+    public void testGetFullyExpiredSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // create sstable with tombstone that should be expired in no older timestamps
+        applyDeleteMutation(CF2, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // first sstable with tombstone is compacting
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+        // create another sstable with more recent timestamp
+        applyMutation(CF2, rowKey, timestamp1);
+        cfs.forceBlockingFlush();
+
+        // second sstable is overlapping
+        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+        // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
+        assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+        // however if we add an older mutation to the memtable then the sstable should not be expired
+        applyMutation(CF2, rowKey, timestamp3);
+        expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(0, expired.size());
+    }
+
+    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        CellName colName = cellname("birthdate");
+        ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.add(cf, colName, val, timestamp);
+        rm.applyUnsafe();
+    }
+
+    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.delete(cf, timestamp);
+        rm.applyUnsafe();
+    }
+
+
+
+}


[02/10] cassandra git commit: maxPurgeableTimestamp needs to check memtables too

Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too

Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7

Branch: refs/heads/cassandra-3.0
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  14 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  15 +-
 .../db/compaction/CompactionController.java     |  21 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
 6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
  * Apply change to compaction throughput in real time (CASSANDRA-10025)
  * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
  * Avoid over-fetching during the page of range queries (CASSANDRA-8521)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+    public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
         ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 {
                     indexer.updateRowLevelIndexes();
                     updater.finish();
-                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+                    return updater;
                 }
                 else if (!monitorOwned)
                 {
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
     }
 
     // the function we provide to the btree utilities to perform any column replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
+    static final class ColumnUpdater implements UpdateFunction<Cell>
     {
         final AtomicBTreeColumns updating;
         final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         long colUpdateTimeDelta = Long.MAX_VALUE;
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Cell> inserted; // TODO: replace with walk of aborted BTree
+        long minTimestamp = Long.MAX_VALUE;
 
         private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
         {
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(insert);
+            minTimestamp = Math.min(minTimestamp, insert.timestamp());
             return insert;
         }
 
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             Cell reconciled = existing.reconcile(update);
             indexer.update(existing, reconciled);
+            // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+            // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+            // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+            // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+            minTimestamp = Math.min(minTimestamp, update.timestamp());
             if (existing != reconciled)
             {
                 reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
                 inserted.clear();
             }
             reclaimer.cancel();
+            minTimestamp = Long.MAX_VALUE;
         }
 
         protected void abort(Cell abort)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
+    // The smallest timestamp for all partitions stored in this memtable
+    private long minTimestamp = Long.MAX_VALUE;
+
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
+        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+        liveDataSize.addAndGet(initialSize + updater.dataSize);
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
+        return updater.colUpdateTimeDelta;
     }
 
     // for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
+    public long getMinTimestamp()
+    {
+        return minTimestamp;
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
      * Finds expired sstables
      *
      * works something like this;
-     * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+     * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
      * 2. build a list of fully expired candidates
      * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
      *    - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
                 minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
         }
 
+        for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+            minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
         // At this point, minTimestamp denotes the lowest timestamp of any relevant
-        // SSTable that contains a constructive value. candidates contains all the
+        // SSTable or Memtable that contains a constructive value. candidates contains all the
         // candidates with no constructive values. The ones out of these that have
         // (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
 
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
      * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
      * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
-     * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+     * participating in this compaction, or memtable that contains this partition,
+     * or LONG.MAX_VALUE if no SSTable or memtable exist.
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
             else if (sstable.getBloomFilter().isPresent(key))
                 min = Math.min(min, sstable.getMinTimestamp());
         }
+
+        for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+        {
+            ColumnFamily cf = memtable.getColumnFamily(key);
+            if (cf != null)
+                min = Math.min(min, memtable.getMinTimestamp());
+        }
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                 RangeTombstone t = tombstone;
                 tombstone = null;
 
-                if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+                if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
                 {
                     indexBuilder.tombstoneTracker().update(t, true);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "CompactionControllerTest";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
+    }
+
+    @Test
+    public void testMaxPurgeableTimestamp()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // add to first memtable
+        applyMutation(CF1, rowKey, timestamp1);
+
+        // check max purgeable timestamp without any sstables
+        try(CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+            cfs.forceBlockingFlush();
+            assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+        }
+
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+        // create another sstable
+        applyMutation(CF1, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // check max purgeable timestamp when compacting the first sstable with and without a memtable
+        try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+        }
+
+        // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+        cfs.forceBlockingFlush();
+
+        //newest to oldest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp1);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp3);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+
+        cfs.forceBlockingFlush();
+
+        //oldest to newest
+        try (CompactionController controller = new CompactionController(cfs, null, 0))
+        {
+            applyMutation(CF1, rowKey, timestamp3);
+            applyMutation(CF1, rowKey, timestamp2);
+            applyMutation(CF1, rowKey, timestamp1);
+
+            assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+        }
+    }
+
+    @Test
+    public void testGetFullyExpiredSSTables()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+
+        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+        long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+        long timestamp2 = timestamp1 - 5;
+        long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+        // create sstable with tombstone that should be expired in no older timestamps
+        applyDeleteMutation(CF2, rowKey, timestamp2);
+        cfs.forceBlockingFlush();
+
+        // first sstable with tombstone is compacting
+        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+        // create another sstable with more recent timestamp
+        applyMutation(CF2, rowKey, timestamp1);
+        cfs.forceBlockingFlush();
+
+        // second sstable is overlapping
+        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+        // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(1, expired.size());
+        assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+        // however if we add an older mutation to the memtable then the sstable should not be expired
+        applyMutation(CF2, rowKey, timestamp3);
+        expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+        assertNotNull(expired);
+        assertEquals(0, expired.size());
+    }
+
+    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        CellName colName = cellname("birthdate");
+        ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.add(cf, colName, val, timestamp);
+        rm.applyUnsafe();
+    }
+
+    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+    {
+        Mutation rm = new Mutation(KEYSPACE, rowKey);
+        rm.delete(cf, timestamp);
+        rm.applyUnsafe();
+    }
+
+
+
+}


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.3

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2174280
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2174280
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2174280

Branch: refs/heads/cassandra-3.3
Commit: f2174280d6dcbb951b539dc558d1e1ae81f43b5c
Parents: acb7fed 442f473
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:15 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:15 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ed56a78,70abffe..81657f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,9 -11,20 +15,10 @@@ Merged from 3.0
     tombstone (CASSANDRA-10743)
   * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
   * Fix potential assertion error during compaction (CASSANDRA-10944)
 - * Fix counting of received sstables in streaming (CASSANDRA-10949)
 - * Implement hints compression (CASSANDRA-9428)
 - * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 - * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 - * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 - * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 - * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 - * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 - * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 - * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
  Merged from 2.2:
 -2.2.5
+  * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
   * Apply change to compaction throughput in real time (CASSANDRA-10025)
 + * (cqlsh) encode input correctly when saving history
   * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
   * Start L0 STCS-compactions even if there is a L0 -> L1 compaction
     going (CASSANDRA-10979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 8e7a43c,5d5f7bf..952c045
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -334,37 -316,24 +338,42 @@@ public class Memtable implements Compar
          return creationTime;
      }
  
+     public long getMinTimestamp()
+     {
+         return minTimestamp;
+     }
+ 
 -    class FlushRunnable extends DiskAwareRunnable
 +    class FlushRunnable implements Callable<SSTableMultiWriter>
      {
 -        private final ReplayPosition context;
 +        public final ReplayPosition context;
          private final long estimatedSize;
 +        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
  
          private final boolean isBatchLogTable;
 +        private final SSTableMultiWriter writer;
 +
 +        // keeping these to be able to log what we are actually flushing
 +        private final PartitionPosition from;
 +        private final PartitionPosition to;
  
 -        FlushRunnable(ReplayPosition context)
 +        FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
          {
 -            this.context = context;
 +            this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
 +        }
 +
 +        FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
 +        {
 +            this(context, partitions, null, null, null, txn);
 +        }
  
 +        FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
 +        {
 +            this.context = context;
 +            this.toFlush = toFlush;
 +            this.from = from;
 +            this.to = to;
              long keySize = 0;
 -            for (PartitionPosition key : partitions.keySet())
 +            for (PartitionPosition key : toFlush.keySet())
              {
                  //  make sure we don't write non-sensical keys
                  assert key instanceof DecoratedKey;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a7224a1,5cb60c5..79c6d53
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -190,10 -196,18 +196,17 @@@ public class CompactionController imple
          {
              // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
              // we check index file instead.
 -            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 -                min = Math.min(min, sstable.getMinTimestamp());
 -            else if (sstable.getBloomFilter().isPresent(key))
 +            if ((sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 +                || sstable.getBloomFilter().isPresent(key))
                  min = Math.min(min, sstable.getMinTimestamp());
          }
+ 
+         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         {
+             Partition partition = memtable.getPartition(key);
+             if (partition != null)
+                 min = Math.min(min, partition.stats().minTimestamp);
+         }
          return min;
      }
  


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737

Branch: refs/heads/cassandra-3.0
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
 +3.0.3
 + * Update CQL documentation (CASSANDRA-10899)
 + * Check the column name, not cell name, for dropped columns when reading
 +   legacy sstables (CASSANDRA-11018)
 + * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
 + * Remove checksum files after replaying hints (CASSANDRA-10947)
 + * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
 + * Ensure stale index entries are purged during reads (CASSANDRA-11013)
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
  2.2.5
+  * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
   * Apply change to compaction throughput in real time (CASSANDRA-10025)
   * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
 - * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
   * Start L0 STCS-compactions even if there is a L0 -> L1 compaction
     going (CASSANDRA-10979)
   * Make UUID LSB unique per process (CASSANDRA-7925)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
              }
          }
  
 -        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
 -        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
 -        liveDataSize.addAndGet(initialSize + updater.dataSize);
 -        currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
 -        return updater.colUpdateTimeDelta;
 -    }
 -
 -    // for debugging
 -    public String contents()
 -    {
 -        StringBuilder builder = new StringBuilder();
 -        builder.append("{");
 -        for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
 -        {
 -            builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
 -        }
 -        builder.append("}");
 -        return builder.toString();
 +        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++        minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
 +        liveDataSize.addAndGet(initialSize + pair[0]);
 +        columnsCollector.update(update.columns());
 +        statsCollector.update(update.stats());
 +        currentOperations.addAndGet(update.operationCount());
 +        return pair[1];
      }
  
      public int partitionCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  
++import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
              else if (sstable.getBloomFilter().isPresent(key))
                  min = Math.min(min, sstable.getMinTimestamp());
          }
+ 
+         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         {
 -            ColumnFamily cf = memtable.getColumnFamily(key);
 -            if (cf != null)
 -                min = Math.min(min, memtable.getMinTimestamp());
++            Partition partition = memtable.getPartition(key);
++            if (partition != null)
++                min = Math.min(min, partition.stats().minTimestamp);
+         }
          return min;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+ 
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
 -import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ 
+ public class CompactionControllerTest extends SchemaLoader
+ {
+     private static final String KEYSPACE = "CompactionControllerTest";
+     private static final String CF1 = "Standard1";
+     private static final String CF2 = "Standard2";
+ 
+     @BeforeClass
+     public static void defineSchema() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
++                                    KeyspaceParams.simple(1),
++                                    CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build(),
++                                    CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build());
+     }
+ 
+     @Test
+     public void testMaxPurgeableTimestamp()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
 -        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // add to first memtable
 -        applyMutation(CF1, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+ 
+         // check max purgeable timestamp without any sstables
+         try(CompactionController controller = new CompactionController(cfs, null, 0))
+         {
+             assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+ 
+             cfs.forceBlockingFlush();
+             assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+         }
+ 
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+ 
+         // create another sstable
 -        applyMutation(CF1, rowKey, timestamp2);
++        applyMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // check max purgeable timestamp when compacting the first sstable with and without a memtable
+         try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+         {
+             assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+ 
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+         }
+ 
+         // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+         cfs.forceBlockingFlush();
+ 
+         //newest to oldest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp1);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+ 
+         cfs.forceBlockingFlush();
+ 
+         //oldest to newest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp3);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp1);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+     }
+ 
+     @Test
+     public void testGetFullyExpiredSSTables()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // create sstable with tombstone that should be expired in no older timestamps
 -        applyDeleteMutation(CF2, rowKey, timestamp2);
++        applyDeleteMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // first sstable with tombstone is compacting
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+ 
+         // create another sstable with more recent timestamp
 -        applyMutation(CF2, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+         cfs.forceBlockingFlush();
+ 
+         // second sstable is overlapping
 -        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+ 
+         // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+         int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+         Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(1, expired.size());
+         assertEquals(compacting.iterator().next(), expired.iterator().next());
+ 
+         // however if we add an older mutation to the memtable then the sstable should not be expired
 -        applyMutation(CF2, rowKey, timestamp3);
++        applyMutation(cfs.metadata, key, timestamp3);
+         expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(0, expired.size());
+     }
+ 
 -    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        CellName colName = cellname("birthdate");
+         ByteBuffer val = ByteBufferUtil.bytes(1L);
+ 
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.add(cf, colName, val, timestamp);
 -        rm.applyUnsafe();
++        new RowUpdateBuilder(cfm, timestamp, key)
++        .clustering("ck")
++        .add("val", val)
++        .build()
++        .applyUnsafe();
+     }
+ 
 -    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.delete(cf, timestamp);
 -        rm.applyUnsafe();
++        new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++        .applyUnsafe();
+     }
 -
 -
 -
+ }


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737

Branch: refs/heads/trunk
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
 +3.0.3
 + * Update CQL documentation (CASSANDRA-10899)
 + * Check the column name, not cell name, for dropped columns when reading
 +   legacy sstables (CASSANDRA-11018)
 + * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
 + * Remove checksum files after replaying hints (CASSANDRA-10947)
 + * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
 + * Ensure stale index entries are purged during reads (CASSANDRA-11013)
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
  2.2.5
+  * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
   * Apply change to compaction throughput in real time (CASSANDRA-10025)
   * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
 - * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
   * Start L0 STCS-compactions even if there is a L0 -> L1 compaction
     going (CASSANDRA-10979)
   * Make UUID LSB unique per process (CASSANDRA-7925)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
              }
          }
  
 -        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
 -        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
 -        liveDataSize.addAndGet(initialSize + updater.dataSize);
 -        currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
 -        return updater.colUpdateTimeDelta;
 -    }
 -
 -    // for debugging
 -    public String contents()
 -    {
 -        StringBuilder builder = new StringBuilder();
 -        builder.append("{");
 -        for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
 -        {
 -            builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
 -        }
 -        builder.append("}");
 -        return builder.toString();
 +        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++        minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
 +        liveDataSize.addAndGet(initialSize + pair[0]);
 +        columnsCollector.update(update.columns());
 +        statsCollector.update(update.stats());
 +        currentOperations.addAndGet(update.operationCount());
 +        return pair[1];
      }
  
      public int partitionCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  
++import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
              else if (sstable.getBloomFilter().isPresent(key))
                  min = Math.min(min, sstable.getMinTimestamp());
          }
+ 
+         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         {
 -            ColumnFamily cf = memtable.getColumnFamily(key);
 -            if (cf != null)
 -                min = Math.min(min, memtable.getMinTimestamp());
++            Partition partition = memtable.getPartition(key);
++            if (partition != null)
++                min = Math.min(min, partition.stats().minTimestamp);
+         }
          return min;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+ 
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
 -import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ 
+ public class CompactionControllerTest extends SchemaLoader
+ {
+     private static final String KEYSPACE = "CompactionControllerTest";
+     private static final String CF1 = "Standard1";
+     private static final String CF2 = "Standard2";
+ 
+     @BeforeClass
+     public static void defineSchema() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
++                                    KeyspaceParams.simple(1),
++                                    CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build(),
++                                    CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build());
+     }
+ 
+     @Test
+     public void testMaxPurgeableTimestamp()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
 -        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // add to first memtable
 -        applyMutation(CF1, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+ 
+         // check max purgeable timestamp without any sstables
+         try(CompactionController controller = new CompactionController(cfs, null, 0))
+         {
+             assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+ 
+             cfs.forceBlockingFlush();
+             assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+         }
+ 
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+ 
+         // create another sstable
 -        applyMutation(CF1, rowKey, timestamp2);
++        applyMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // check max purgeable timestamp when compacting the first sstable with and without a memtable
+         try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+         {
+             assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+ 
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+         }
+ 
+         // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+         cfs.forceBlockingFlush();
+ 
+         //newest to oldest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp1);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+ 
+         cfs.forceBlockingFlush();
+ 
+         //oldest to newest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp3);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp1);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+     }
+ 
+     @Test
+     public void testGetFullyExpiredSSTables()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // create sstable with tombstone that should be expired in no older timestamps
 -        applyDeleteMutation(CF2, rowKey, timestamp2);
++        applyDeleteMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // first sstable with tombstone is compacting
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+ 
+         // create another sstable with more recent timestamp
 -        applyMutation(CF2, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+         cfs.forceBlockingFlush();
+ 
+         // second sstable is overlapping
 -        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+ 
+         // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+         int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+         Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(1, expired.size());
+         assertEquals(compacting.iterator().next(), expired.iterator().next());
+ 
+         // however if we add an older mutation to the memtable then the sstable should not be expired
 -        applyMutation(CF2, rowKey, timestamp3);
++        applyMutation(cfs.metadata, key, timestamp3);
+         expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(0, expired.size());
+     }
+ 
 -    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        CellName colName = cellname("birthdate");
+         ByteBuffer val = ByteBufferUtil.bytes(1L);
+ 
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.add(cf, colName, val, timestamp);
 -        rm.applyUnsafe();
++        new RowUpdateBuilder(cfm, timestamp, key)
++        .clustering("ck")
++        .add("val", val)
++        .build()
++        .applyUnsafe();
+     }
+ 
 -    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.delete(cf, timestamp);
 -        rm.applyUnsafe();
++        new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++        .applyUnsafe();
+     }
 -
 -
 -
+ }


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.3

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2174280
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2174280
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2174280

Branch: refs/heads/trunk
Commit: f2174280d6dcbb951b539dc558d1e1ae81f43b5c
Parents: acb7fed 442f473
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:15 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:15 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ed56a78,70abffe..81657f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,9 -11,20 +15,10 @@@ Merged from 3.0
     tombstone (CASSANDRA-10743)
   * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
   * Fix potential assertion error during compaction (CASSANDRA-10944)
 - * Fix counting of received sstables in streaming (CASSANDRA-10949)
 - * Implement hints compression (CASSANDRA-9428)
 - * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 - * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 - * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 - * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 - * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 - * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 - * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 - * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
  Merged from 2.2:
 -2.2.5
+  * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
   * Apply change to compaction throughput in real time (CASSANDRA-10025)
 + * (cqlsh) encode input correctly when saving history
   * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
   * Start L0 STCS-compactions even if there is a L0 -> L1 compaction
     going (CASSANDRA-10979)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 8e7a43c,5d5f7bf..952c045
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -334,37 -316,24 +338,42 @@@ public class Memtable implements Compar
          return creationTime;
      }
  
+     public long getMinTimestamp()
+     {
+         return minTimestamp;
+     }
+ 
 -    class FlushRunnable extends DiskAwareRunnable
 +    class FlushRunnable implements Callable<SSTableMultiWriter>
      {
 -        private final ReplayPosition context;
 +        public final ReplayPosition context;
          private final long estimatedSize;
 +        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
  
          private final boolean isBatchLogTable;
 +        private final SSTableMultiWriter writer;
 +
 +        // keeping these to be able to log what we are actually flushing
 +        private final PartitionPosition from;
 +        private final PartitionPosition to;
  
 -        FlushRunnable(ReplayPosition context)
 +        FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
          {
 -            this.context = context;
 +            this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
 +        }
 +
 +        FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
 +        {
 +            this(context, partitions, null, null, null, txn);
 +        }
  
 +        FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
 +        {
 +            this.context = context;
 +            this.toFlush = toFlush;
 +            this.from = from;
 +            this.to = to;
              long keySize = 0;
 -            for (PartitionPosition key : partitions.keySet())
 +            for (PartitionPosition key : toFlush.keySet())
              {
                  //  make sure we don't write non-sensical keys
                  assert key instanceof DecoratedKey;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a7224a1,5cb60c5..79c6d53
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -190,10 -196,18 +196,17 @@@ public class CompactionController imple
          {
              // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
              // we check index file instead.
 -            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 -                min = Math.min(min, sstable.getMinTimestamp());
 -            else if (sstable.getBloomFilter().isPresent(key))
 +            if ((sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
 +                || sstable.getBloomFilter().isPresent(key))
                  min = Math.min(min, sstable.getMinTimestamp());
          }
+ 
+         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         {
+             Partition partition = memtable.getPartition(key);
+             if (partition != null)
+                 min = Math.min(min, partition.stats().minTimestamp);
+         }
          return min;
      }
  


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737

Branch: refs/heads/cassandra-3.3
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |   9 +
 .../db/compaction/CompactionController.java     |  19 +-
 .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
 4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
 +3.0.3
 + * Update CQL documentation (CASSANDRA-10899)
 + * Check the column name, not cell name, for dropped columns when reading
 +   legacy sstables (CASSANDRA-11018)
 + * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
 + * Remove checksum files after replaying hints (CASSANDRA-10947)
 + * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
 + * Ensure stale index entries are purged during reads (CASSANDRA-11013)
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
  2.2.5
+  * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
   * Apply change to compaction throughput in real time (CASSANDRA-10025)
   * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
 - * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
   * Start L0 STCS-compactions even if there is a L0 -> L1 compaction
     going (CASSANDRA-10979)
   * Make UUID LSB unique per process (CASSANDRA-7925)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
              }
          }
  
 -        final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
 -        minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
 -        liveDataSize.addAndGet(initialSize + updater.dataSize);
 -        currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
 -        return updater.colUpdateTimeDelta;
 -    }
 -
 -    // for debugging
 -    public String contents()
 -    {
 -        StringBuilder builder = new StringBuilder();
 -        builder.append("{");
 -        for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
 -        {
 -            builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
 -        }
 -        builder.append("}");
 -        return builder.toString();
 +        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++        minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
 +        liveDataSize.addAndGet(initialSize + pair[0]);
 +        columnsCollector.update(update.columns());
 +        statsCollector.update(update.stats());
 +        currentOperations.addAndGet(update.operationCount());
 +        return pair[1];
      }
  
      public int partitionCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  
++import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
              else if (sstable.getBloomFilter().isPresent(key))
                  min = Math.min(min, sstable.getMinTimestamp());
          }
+ 
+         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         {
 -            ColumnFamily cf = memtable.getColumnFamily(key);
 -            if (cf != null)
 -                min = Math.min(min, memtable.getMinTimestamp());
++            Partition partition = memtable.getPartition(key);
++            if (partition != null)
++                min = Math.min(min, partition.stats().minTimestamp);
+         }
          return min;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+ 
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
 -import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ 
+ public class CompactionControllerTest extends SchemaLoader
+ {
+     private static final String KEYSPACE = "CompactionControllerTest";
+     private static final String CF1 = "Standard1";
+     private static final String CF2 = "Standard2";
+ 
+     @BeforeClass
+     public static void defineSchema() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE, CF2));
++                                    KeyspaceParams.simple(1),
++                                    CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build(),
++                                    CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++                                                      .addPartitionKey("pk", AsciiType.instance)
++                                                      .addClusteringColumn("ck", AsciiType.instance)
++                                                      .addRegularColumn("val", AsciiType.instance)
++                                                      .build());
+     }
+ 
+     @Test
+     public void testMaxPurgeableTimestamp()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
 -        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // add to first memtable
 -        applyMutation(CF1, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+ 
+         // check max purgeable timestamp without any sstables
+         try(CompactionController controller = new CompactionController(cfs, null, 0))
+         {
+             assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+ 
+             cfs.forceBlockingFlush();
+             assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+         }
+ 
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+ 
+         // create another sstable
 -        applyMutation(CF1, rowKey, timestamp2);
++        applyMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // check max purgeable timestamp when compacting the first sstable with and without a memtable
+         try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+         {
+             assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+ 
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+         }
+ 
+         // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+         cfs.forceBlockingFlush();
+ 
+         //newest to oldest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp1);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp3);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+ 
+         cfs.forceBlockingFlush();
+ 
+         //oldest to newest
+         try (CompactionController controller = new CompactionController(cfs, null, 0))
+         {
 -            applyMutation(CF1, rowKey, timestamp3);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp1);
++            applyMutation(cfs.metadata, key, timestamp3);
++            applyMutation(cfs.metadata, key, timestamp2);
++            applyMutation(cfs.metadata, key, timestamp1);
+ 
+             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+         }
+     }
+ 
+     @Test
+     public void testGetFullyExpiredSSTables()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+         cfs.truncateBlocking();
+ 
 -        ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++        DecoratedKey key = Util.dk("k1");
+ 
+         long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+         long timestamp2 = timestamp1 - 5;
+         long timestamp3 = timestamp2 - 5; // oldest timestamp
+ 
+         // create sstable with tombstone that should be expired in no older timestamps
 -        applyDeleteMutation(CF2, rowKey, timestamp2);
++        applyDeleteMutation(cfs.metadata, key, timestamp2);
+         cfs.forceBlockingFlush();
+ 
+         // first sstable with tombstone is compacting
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+ 
+         // create another sstable with more recent timestamp
 -        applyMutation(CF2, rowKey, timestamp1);
++        applyMutation(cfs.metadata, key, timestamp1);
+         cfs.forceBlockingFlush();
+ 
+         // second sstable is overlapping
 -        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++        Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+ 
+         // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+         int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+         Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(1, expired.size());
+         assertEquals(compacting.iterator().next(), expired.iterator().next());
+ 
+         // however if we add an older mutation to the memtable then the sstable should not be expired
 -        applyMutation(CF2, rowKey, timestamp3);
++        applyMutation(cfs.metadata, key, timestamp3);
+         expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+         assertNotNull(expired);
+         assertEquals(0, expired.size());
+     }
+ 
 -    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        CellName colName = cellname("birthdate");
+         ByteBuffer val = ByteBufferUtil.bytes(1L);
+ 
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.add(cf, colName, val, timestamp);
 -        rm.applyUnsafe();
++        new RowUpdateBuilder(cfm, timestamp, key)
++        .clustering("ck")
++        .add("val", val)
++        .build()
++        .applyUnsafe();
+     }
+ 
 -    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++    private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+     {
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.delete(cf, timestamp);
 -        rm.applyUnsafe();
++        new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++        .applyUnsafe();
+     }
 -
 -
 -
+ }