You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/25 10:13:16 UTC
[01/10] cassandra git commit: maxPurgeableTimestamp needs to check
memtables too
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 b4d67c9e5 -> b5d6d4f72
refs/heads/cassandra-3.0 2f8e5f346 -> 442f4737c
refs/heads/cassandra-3.3 acb7fed1d -> f2174280d
refs/heads/trunk 72790dc8e -> 4abd99363
maxPurgeableTimestamp needs to check memtables too
Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7
Branch: refs/heads/cassandra-2.2
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/AtomicBTreeColumns.java | 14 +-
src/java/org/apache/cassandra/db/Memtable.java | 15 +-
.../db/compaction/CompactionController.java | 21 +-
.../db/compaction/LazilyCompactedRow.java | 2 +-
.../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Avoid over-fetching during the page of range queries (CASSANDRA-8521)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
*
* @return the difference in size seen after merging the given columns
*/
- public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
{
indexer.updateRowLevelIndexes();
updater.finish();
- return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ return updater;
}
else if (!monitorOwned)
{
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
}
// the function we provide to the btree utilities to perform any column replacements
- private static final class ColumnUpdater implements UpdateFunction<Cell>
+ static final class ColumnUpdater implements UpdateFunction<Cell>
{
final AtomicBTreeColumns updating;
final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
long colUpdateTimeDelta = Long.MAX_VALUE;
final MemtableAllocator.DataReclaimer reclaimer;
List<Cell> inserted; // TODO: replace with walk of aborted BTree
+ long minTimestamp = Long.MAX_VALUE;
private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(insert);
+ minTimestamp = Math.min(minTimestamp, insert.timestamp());
return insert;
}
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
{
Cell reconciled = existing.reconcile(update);
indexer.update(existing, reconciled);
+ // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+ // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+ // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+ // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+ minTimestamp = Math.min(minTimestamp, update.timestamp());
if (existing != reconciled)
{
reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
inserted.clear();
}
reclaimer.cancel();
+ minTimestamp = Long.MAX_VALUE;
}
protected void abort(Cell abort)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
+ // The smallest timestamp for all partitions stored in this memtable
+ private long minTimestamp = Long.MAX_VALUE;
+
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
}
}
- final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- liveDataSize.addAndGet(initialSize + pair.left);
+ final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+ minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+ liveDataSize.addAndGet(initialSize + updater.dataSize);
currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return pair.right;
+ return updater.colUpdateTimeDelta;
}
// for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
* Finds expired sstables
*
* works something like this;
- * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+ * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
* 2. build a list of fully expired candidates
* 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
* - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
}
+ for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+ minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
// At this point, minTimestamp denotes the lowest timestamp of any relevant
- // SSTable that contains a constructive value. candidates contains all the
+ // SSTable or Memtable that contains a constructive value. candidates contains all the
// candidates with no constructive values. The ones out of these that have
// (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
* @return the largest timestamp before which it's okay to drop tombstones for the given partition;
* i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
* in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+ * participating in this compaction, or memtable that contains this partition,
+ * or LONG.MAX_VALUE if no SSTable or memtable exist.
*/
public long maxPurgeableTimestamp(DecoratedKey key)
{
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ ColumnFamily cf = memtable.getColumnFamily(key);
+ if (cf != null)
+ min = Math.min(min, memtable.getMinTimestamp());
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF2));
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
+ applyMutation(CF1, rowKey, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+ // create another sstable
+ applyMutation(CF1, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp1);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
+ applyDeleteMutation(CF2, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+ // create another sstable with more recent timestamp
+ applyMutation(CF2, rowKey, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
+ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
+ applyMutation(CF2, rowKey, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
+ private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.add(cf, colName, val, timestamp);
+ rm.applyUnsafe();
+ }
+
+ private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.delete(cf, timestamp);
+ rm.applyUnsafe();
+ }
+
+
+
+}
[03/10] cassandra git commit: maxPurgeableTimestamp needs to check
memtables too
Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too
Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7
Branch: refs/heads/cassandra-3.3
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/AtomicBTreeColumns.java | 14 +-
src/java/org/apache/cassandra/db/Memtable.java | 15 +-
.../db/compaction/CompactionController.java | 21 +-
.../db/compaction/LazilyCompactedRow.java | 2 +-
.../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Avoid over-fetching during the page of range queries (CASSANDRA-8521)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
*
* @return the difference in size seen after merging the given columns
*/
- public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
{
indexer.updateRowLevelIndexes();
updater.finish();
- return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ return updater;
}
else if (!monitorOwned)
{
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
}
// the function we provide to the btree utilities to perform any column replacements
- private static final class ColumnUpdater implements UpdateFunction<Cell>
+ static final class ColumnUpdater implements UpdateFunction<Cell>
{
final AtomicBTreeColumns updating;
final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
long colUpdateTimeDelta = Long.MAX_VALUE;
final MemtableAllocator.DataReclaimer reclaimer;
List<Cell> inserted; // TODO: replace with walk of aborted BTree
+ long minTimestamp = Long.MAX_VALUE;
private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(insert);
+ minTimestamp = Math.min(minTimestamp, insert.timestamp());
return insert;
}
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
{
Cell reconciled = existing.reconcile(update);
indexer.update(existing, reconciled);
+ // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+ // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+ // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+ // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+ minTimestamp = Math.min(minTimestamp, update.timestamp());
if (existing != reconciled)
{
reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
inserted.clear();
}
reclaimer.cancel();
+ minTimestamp = Long.MAX_VALUE;
}
protected void abort(Cell abort)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
+ // The smallest timestamp for all partitions stored in this memtable
+ private long minTimestamp = Long.MAX_VALUE;
+
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
}
}
- final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- liveDataSize.addAndGet(initialSize + pair.left);
+ final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+ minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+ liveDataSize.addAndGet(initialSize + updater.dataSize);
currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return pair.right;
+ return updater.colUpdateTimeDelta;
}
// for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
* Finds expired sstables
*
* works something like this;
- * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+ * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
* 2. build a list of fully expired candidates
* 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
* - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
}
+ for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+ minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
// At this point, minTimestamp denotes the lowest timestamp of any relevant
- // SSTable that contains a constructive value. candidates contains all the
+ // SSTable or Memtable that contains a constructive value. candidates contains all the
// candidates with no constructive values. The ones out of these that have
// (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
* @return the largest timestamp before which it's okay to drop tombstones for the given partition;
* i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
* in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+ * participating in this compaction, or memtable that contains this partition,
+ * or LONG.MAX_VALUE if no SSTable or memtable exist.
*/
public long maxPurgeableTimestamp(DecoratedKey key)
{
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ ColumnFamily cf = memtable.getColumnFamily(key);
+ if (cf != null)
+ min = Math.min(min, memtable.getMinTimestamp());
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF2));
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
+ applyMutation(CF1, rowKey, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+ // create another sstable
+ applyMutation(CF1, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp1);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
+ applyDeleteMutation(CF2, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+ // create another sstable with more recent timestamp
+ applyMutation(CF2, rowKey, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
+ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
+ applyMutation(CF2, rowKey, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
+ private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.add(cf, colName, val, timestamp);
+ rm.applyUnsafe();
+ }
+
+ private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.delete(cf, timestamp);
+ rm.applyUnsafe();
+ }
+
+
+
+}
[10/10] cassandra git commit: Merge branch 'cassandra-3.3' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.3' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4abd9936
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4abd9936
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4abd9936
Branch: refs/heads/trunk
Commit: 4abd9936342f5faf3e23c3d395e24278a3b3bfbd
Parents: 72790dc f217428
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:31 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:31 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4abd9936/CHANGES.txt
----------------------------------------------------------------------
[04/10] cassandra git commit: maxPurgeableTimestamp needs to check
memtables too
Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too
Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7
Branch: refs/heads/trunk
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/AtomicBTreeColumns.java | 14 +-
src/java/org/apache/cassandra/db/Memtable.java | 15 +-
.../db/compaction/CompactionController.java | 21 +-
.../db/compaction/LazilyCompactedRow.java | 2 +-
.../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Avoid over-fetching during the page of range queries (CASSANDRA-8521)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
*
* @return the difference in size seen after merging the given columns
*/
- public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
{
indexer.updateRowLevelIndexes();
updater.finish();
- return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ return updater;
}
else if (!monitorOwned)
{
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
}
// the function we provide to the btree utilities to perform any column replacements
- private static final class ColumnUpdater implements UpdateFunction<Cell>
+ static final class ColumnUpdater implements UpdateFunction<Cell>
{
final AtomicBTreeColumns updating;
final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
long colUpdateTimeDelta = Long.MAX_VALUE;
final MemtableAllocator.DataReclaimer reclaimer;
List<Cell> inserted; // TODO: replace with walk of aborted BTree
+ long minTimestamp = Long.MAX_VALUE;
private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(insert);
+ minTimestamp = Math.min(minTimestamp, insert.timestamp());
return insert;
}
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
{
Cell reconciled = existing.reconcile(update);
indexer.update(existing, reconciled);
+ // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+ // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+ // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+ // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+ minTimestamp = Math.min(minTimestamp, update.timestamp());
if (existing != reconciled)
{
reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
inserted.clear();
}
reclaimer.cancel();
+ minTimestamp = Long.MAX_VALUE;
}
protected void abort(Cell abort)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
+ // The smallest timestamp for all partitions stored in this memtable
+ private long minTimestamp = Long.MAX_VALUE;
+
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
}
}
- final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- liveDataSize.addAndGet(initialSize + pair.left);
+ final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+ minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+ liveDataSize.addAndGet(initialSize + updater.dataSize);
currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return pair.right;
+ return updater.colUpdateTimeDelta;
}
// for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
* Finds expired sstables
*
* works something like this;
- * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+ * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
* 2. build a list of fully expired candidates
* 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
* - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
}
+ for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+ minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
// At this point, minTimestamp denotes the lowest timestamp of any relevant
- // SSTable that contains a constructive value. candidates contains all the
+ // SSTable or Memtable that contains a constructive value. candidates contains all the
// candidates with no constructive values. The ones out of these that have
// (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
* @return the largest timestamp before which it's okay to drop tombstones for the given partition;
* i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
* in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+ * participating in this compaction, or memtable that contains this partition,
+ * or LONG.MAX_VALUE if no SSTable or memtable exist.
*/
public long maxPurgeableTimestamp(DecoratedKey key)
{
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ ColumnFamily cf = memtable.getColumnFamily(key);
+ if (cf != null)
+ min = Math.min(min, memtable.getMinTimestamp());
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF2));
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
+ applyMutation(CF1, rowKey, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+ // create another sstable
+ applyMutation(CF1, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp1);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
+ applyDeleteMutation(CF2, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+ // create another sstable with more recent timestamp
+ applyMutation(CF2, rowKey, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
+ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
+ applyMutation(CF2, rowKey, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
+ private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.add(cf, colName, val, timestamp);
+ rm.applyUnsafe();
+ }
+
+ private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.delete(cf, timestamp);
+ rm.applyUnsafe();
+ }
+
+
+
+}
[02/10] cassandra git commit: maxPurgeableTimestamp needs to check
memtables too
Posted by ma...@apache.org.
maxPurgeableTimestamp needs to check memtables too
Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7
Branch: refs/heads/cassandra-3.0
Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6
Parents: b4d67c9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jan 4 16:24:51 2016 +0100
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Mon Jan 25 13:30:05 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/AtomicBTreeColumns.java | 14 +-
src/java/org/apache/cassandra/db/Memtable.java | 15 +-
.../db/compaction/CompactionController.java | 21 +-
.../db/compaction/LazilyCompactedRow.java | 2 +-
.../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++
6 files changed, 232 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59997ff..cdc3b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Avoid over-fetching during the page of range queries (CASSANDRA-8521)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 710b289..f5b7712 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily
*
* @return the difference in size seen after merging the given columns
*/
- public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
DeletionInfo inputDeletionInfoCopy = null;
@@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily
{
indexer.updateRowLevelIndexes();
updater.finish();
- return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ return updater;
}
else if (!monitorOwned)
{
@@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily
}
// the function we provide to the btree utilities to perform any column replacements
- private static final class ColumnUpdater implements UpdateFunction<Cell>
+ static final class ColumnUpdater implements UpdateFunction<Cell>
{
final AtomicBTreeColumns updating;
final CFMetaData metadata;
@@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily
long colUpdateTimeDelta = Long.MAX_VALUE;
final MemtableAllocator.DataReclaimer reclaimer;
List<Cell> inserted; // TODO: replace with walk of aborted BTree
+ long minTimestamp = Long.MAX_VALUE;
private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
@@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(insert);
+ minTimestamp = Math.min(minTimestamp, insert.timestamp());
return insert;
}
@@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily
{
Cell reconciled = existing.reconcile(update);
indexer.update(existing, reconciled);
+ // pick the smallest timestamp because we want to be consistent with the logic applied when inserting
+ // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want
+ // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the
+ // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp()
+ minTimestamp = Math.min(minTimestamp, update.timestamp());
if (existing != reconciled)
{
reconciled = reconciled.localCopy(metadata, allocator, writeOp);
@@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily
inserted.clear();
}
reclaimer.cancel();
+ minTimestamp = Long.MAX_VALUE;
}
protected void abort(Cell abort)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e96a71e..fb4da72 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -87,6 +87,9 @@ public class Memtable implements Comparable<Memtable>
private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
+ // The smallest timestamp for all partitions stored in this memtable
+ private long minTimestamp = Long.MAX_VALUE;
+
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
@@ -224,10 +227,11 @@ public class Memtable implements Comparable<Memtable>
}
}
- final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- liveDataSize.addAndGet(initialSize + pair.left);
+ final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
+ minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
+ liveDataSize.addAndGet(initialSize + updater.dataSize);
currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return pair.right;
+ return updater.colUpdateTimeDelta;
}
// for debugging
@@ -316,6 +320,11 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index e0278c9..00d1344 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable
* Finds expired sstables
*
* works something like this;
- * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data
+ * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data
* 2. build a list of fully expired candidates
* 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp)
* - if not droppable, remove from candidates
@@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable
minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp());
}
+ for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables())
+ minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
+
// At this point, minTimestamp denotes the lowest timestamp of any relevant
- // SSTable that contains a constructive value. candidates contains all the
+ // SSTable or Memtable that contains a constructive value. candidates contains all the
// candidates with no constructive values. The ones out of these that have
// (getMaxTimestamp() < minTimestamp) serve no purpose anymore.
@@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable
* @return the largest timestamp before which it's okay to drop tombstones for the given partition;
* i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
* in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists.
+ * participating in this compaction, or memtable that contains this partition,
+ * or LONG.MAX_VALUE if no SSTable or memtable exist.
*/
public long maxPurgeableTimestamp(DecoratedKey key)
{
@@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ ColumnFamily cf = memtable.getColumnFamily(key);
+ if (cf != null)
+ min = Math.min(min, memtable.getMinTimestamp());
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 93505ae..ec82571 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
RangeTombstone t = tombstone;
tombstone = null;
- if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore))
+ if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp())
{
indexBuilder.tombstoneTracker().update(t, true);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
new file mode 100644
index 0000000..750a38e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionControllerTest extends SchemaLoader
+{
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF2));
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
+ applyMutation(CF1, rowKey, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+
+ // create another sstable
+ applyMutation(CF1, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp1);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(CF1, rowKey, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
+ applyDeleteMutation(CF2, rowKey, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
+
+ // create another sstable with more recent timestamp
+ applyMutation(CF2, rowKey, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
+ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
+ applyMutation(CF2, rowKey, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
+ private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.add(cf, colName, val, timestamp);
+ rm.applyUnsafe();
+ }
+
+ private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+ {
+ Mutation rm = new Mutation(KEYSPACE, rowKey);
+ rm.delete(cf, timestamp);
+ rm.applyUnsafe();
+ }
+
+
+
+}
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.3
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2174280
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2174280
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2174280
Branch: refs/heads/cassandra-3.3
Commit: f2174280d6dcbb951b539dc558d1e1ae81f43b5c
Parents: acb7fed 442f473
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:15 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:15 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ed56a78,70abffe..81657f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,9 -11,20 +15,10 @@@ Merged from 3.0
tombstone (CASSANDRA-10743)
* MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
* Fix potential assertion error during compaction (CASSANDRA-10944)
- * Fix counting of received sstables in streaming (CASSANDRA-10949)
- * Implement hints compression (CASSANDRA-9428)
- * Fix potential assertion error when reading static columns (CASSANDRA-10903)
- * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
- * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
- * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
- * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
- * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
- * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
- * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
Merged from 2.2:
-2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
+ * (cqlsh) encode input correctly when saving history
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Start L0 STCS-compactions even if there is a L0 -> L1 compaction
going (CASSANDRA-10979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 8e7a43c,5d5f7bf..952c045
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -334,37 -316,24 +338,42 @@@ public class Memtable implements Compar
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
- class FlushRunnable extends DiskAwareRunnable
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- private final ReplayPosition context;
+ public final ReplayPosition context;
private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
- FlushRunnable(ReplayPosition context)
+ FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- this.context = context;
+ this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+ }
+
+ FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+ {
+ this(context, partitions, null, null, null, txn);
+ }
+ FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
+ this.context = context;
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ for (PartitionPosition key : toFlush.keySet())
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a7224a1,5cb60c5..79c6d53
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -190,10 -196,18 +196,17 @@@ public class CompactionController imple
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- min = Math.min(min, sstable.getMinTimestamp());
- else if (sstable.getBloomFilter().isPresent(key))
+ if ((sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+ || sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ Partition partition = memtable.getPartition(key);
+ if (partition != null)
+ min = Math.min(min, partition.stats().minTimestamp);
+ }
return min;
}
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737
Branch: refs/heads/cassandra-3.0
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
+3.0.3
+ * Update CQL documentation (CASSANDRA-10899)
+ * Check the column name, not cell name, for dropped columns when reading
+ legacy sstables (CASSANDRA-11018)
+ * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
+ * Remove checksum files after replaying hints (CASSANDRA-10947)
+ * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
+ * Ensure stale index entries are purged during reads (CASSANDRA-11013)
+ * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
+ * Fix UnsupportedOperationException when reading old sstable with range
+ tombstone (CASSANDRA-10743)
+ * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
+ * Fix potential assertion error during compaction (CASSANDRA-10944)
+ * Fix counting of received sstables in streaming (CASSANDRA-10949)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
+ * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
+ * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
+ * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
+ * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
+ * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
+Merged from 2.2:
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
- * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
* Start L0 STCS-compactions even if there is a L0 -> L1 compaction
going (CASSANDRA-10979)
* Make UUID LSB unique per process (CASSANDRA-7925)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
}
}
- final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
- liveDataSize.addAndGet(initialSize + updater.dataSize);
- currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return updater.colUpdateTimeDelta;
- }
-
- // for debugging
- public String contents()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
- {
- builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
- }
- builder.append("}");
- return builder.toString();
+ long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++ minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
+ liveDataSize.addAndGet(initialSize + pair[0]);
+ columnsCollector.update(update.columns());
+ statsCollector.update(update.stats());
+ currentOperations.addAndGet(update.operationCount());
+ return pair[1];
}
public int partitionCount()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
import java.util.*;
++import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import com.google.common.collect.Iterables;
+
++import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
- ColumnFamily cf = memtable.getColumnFamily(key);
- if (cf != null)
- min = Math.min(min, memtable.getMinTimestamp());
++ Partition partition = memtable.getPartition(key);
++ if (partition != null)
++ min = Math.min(min, partition.stats().minTimestamp);
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.compaction;
+
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+
-import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+
+ public class CompactionControllerTest extends SchemaLoader
+ {
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF1),
- SchemaLoader.standardCFMD(KEYSPACE, CF2));
++ KeyspaceParams.simple(1),
++ CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build(),
++ CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build());
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+
+ // create another sstable
- applyMutation(CF1, rowKey, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp1);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp3);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
- applyDeleteMutation(CF2, rowKey, timestamp2);
++ applyDeleteMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+
+ // create another sstable with more recent timestamp
- applyMutation(CF2, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
- Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
- applyMutation(CF2, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
- private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.add(cf, colName, val, timestamp);
- rm.applyUnsafe();
++ new RowUpdateBuilder(cfm, timestamp, key)
++ .clustering("ck")
++ .add("val", val)
++ .build()
++ .applyUnsafe();
+ }
+
- private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.delete(cf, timestamp);
- rm.applyUnsafe();
++ new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++ .applyUnsafe();
+ }
-
-
-
+ }
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737
Branch: refs/heads/trunk
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
+3.0.3
+ * Update CQL documentation (CASSANDRA-10899)
+ * Check the column name, not cell name, for dropped columns when reading
+ legacy sstables (CASSANDRA-11018)
+ * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
+ * Remove checksum files after replaying hints (CASSANDRA-10947)
+ * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
+ * Ensure stale index entries are purged during reads (CASSANDRA-11013)
+ * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
+ * Fix UnsupportedOperationException when reading old sstable with range
+ tombstone (CASSANDRA-10743)
+ * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
+ * Fix potential assertion error during compaction (CASSANDRA-10944)
+ * Fix counting of received sstables in streaming (CASSANDRA-10949)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
+ * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
+ * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
+ * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
+ * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
+ * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
+Merged from 2.2:
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
- * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
* Start L0 STCS-compactions even if there is a L0 -> L1 compaction
going (CASSANDRA-10979)
* Make UUID LSB unique per process (CASSANDRA-7925)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
}
}
- final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
- liveDataSize.addAndGet(initialSize + updater.dataSize);
- currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return updater.colUpdateTimeDelta;
- }
-
- // for debugging
- public String contents()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
- {
- builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
- }
- builder.append("}");
- return builder.toString();
+ long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++ minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
+ liveDataSize.addAndGet(initialSize + pair[0]);
+ columnsCollector.update(update.columns());
+ statsCollector.update(update.stats());
+ currentOperations.addAndGet(update.operationCount());
+ return pair[1];
}
public int partitionCount()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
import java.util.*;
++import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import com.google.common.collect.Iterables;
+
++import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
- ColumnFamily cf = memtable.getColumnFamily(key);
- if (cf != null)
- min = Math.min(min, memtable.getMinTimestamp());
++ Partition partition = memtable.getPartition(key);
++ if (partition != null)
++ min = Math.min(min, partition.stats().minTimestamp);
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.compaction;
+
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+
-import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+
+ public class CompactionControllerTest extends SchemaLoader
+ {
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF1),
- SchemaLoader.standardCFMD(KEYSPACE, CF2));
++ KeyspaceParams.simple(1),
++ CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build(),
++ CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build());
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+
+ // create another sstable
- applyMutation(CF1, rowKey, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp1);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp3);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
- applyDeleteMutation(CF2, rowKey, timestamp2);
++ applyDeleteMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+
+ // create another sstable with more recent timestamp
- applyMutation(CF2, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
- Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
- applyMutation(CF2, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
- private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.add(cf, colName, val, timestamp);
- rm.applyUnsafe();
++ new RowUpdateBuilder(cfm, timestamp, key)
++ .clustering("ck")
++ .add("val", val)
++ .build()
++ .applyUnsafe();
+ }
+
- private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.delete(cf, timestamp);
- rm.applyUnsafe();
++ new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++ .applyUnsafe();
+ }
-
-
-
+ }
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.3
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2174280
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2174280
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2174280
Branch: refs/heads/trunk
Commit: f2174280d6dcbb951b539dc558d1e1ae81f43b5c
Parents: acb7fed 442f473
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:09:15 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:09:15 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ed56a78,70abffe..81657f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,9 -11,20 +15,10 @@@ Merged from 3.0
tombstone (CASSANDRA-10743)
* MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
* Fix potential assertion error during compaction (CASSANDRA-10944)
- * Fix counting of received sstables in streaming (CASSANDRA-10949)
- * Implement hints compression (CASSANDRA-9428)
- * Fix potential assertion error when reading static columns (CASSANDRA-10903)
- * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
- * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
- * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
- * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
- * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
- * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
- * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
Merged from 2.2:
-2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
+ * (cqlsh) encode input correctly when saving history
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
* Start L0 STCS-compactions even if there is a L0 -> L1 compaction
going (CASSANDRA-10979)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 8e7a43c,5d5f7bf..952c045
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -334,37 -316,24 +338,42 @@@ public class Memtable implements Compar
return creationTime;
}
+ public long getMinTimestamp()
+ {
+ return minTimestamp;
+ }
+
- class FlushRunnable extends DiskAwareRunnable
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- private final ReplayPosition context;
+ public final ReplayPosition context;
private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
- FlushRunnable(ReplayPosition context)
+ FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- this.context = context;
+ this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+ }
+
+ FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+ {
+ this(context, partitions, null, null, null, txn);
+ }
+ FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
+ this.context = context;
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ for (PartitionPosition key : toFlush.keySet())
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2174280/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a7224a1,5cb60c5..79c6d53
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -190,10 -196,18 +196,17 @@@ public class CompactionController imple
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- min = Math.min(min, sstable.getMinTimestamp());
- else if (sstable.getBloomFilter().isPresent(key))
+ if ((sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+ || sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
+ Partition partition = memtable.getPartition(key);
+ if (partition != null)
+ min = Math.min(min, partition.stats().minTimestamp);
+ }
return min;
}
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737
Branch: refs/heads/cassandra-3.3
Commit: 442f4737ceddd34f14210da49cee4d48b468f01e
Parents: 2f8e5f3 b5d6d4f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 25 10:05:26 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 25 10:05:59 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 9 +
.../db/compaction/CompactionController.java | 19 +-
.../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++
4 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bb26b20,cdc3b34..70abffe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,8 +1,31 @@@
+3.0.3
+ * Update CQL documentation (CASSANDRA-10899)
+ * Check the column name, not cell name, for dropped columns when reading
+ legacy sstables (CASSANDRA-11018)
+ * Don't attempt to index clustering values of static rows (CASSANDRA-11021)
+ * Remove checksum files after replaying hints (CASSANDRA-10947)
+ * Support passing base table metadata to custom 2i validation (CASSANDRA-10924)
+ * Ensure stale index entries are purged during reads (CASSANDRA-11013)
+ * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
+ * Fix UnsupportedOperationException when reading old sstable with range
+ tombstone (CASSANDRA-10743)
+ * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
+ * Fix potential assertion error during compaction (CASSANDRA-10944)
+ * Fix counting of received sstables in streaming (CASSANDRA-10949)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
+ * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
+ * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
+ * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
+ * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
+ * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
+Merged from 2.2:
2.2.5
+ * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949)
* Apply change to compaction throughput in real time (CASSANDRA-10025)
* Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955)
- * Avoid over-fetching during the page of range queries (CASSANDRA-8521)
* Start L0 STCS-compactions even if there is a L0 -> L1 compaction
going (CASSANDRA-10979)
* Make UUID LSB unique per process (CASSANDRA-7925)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 96b1775,fb4da72..5d5f7bf
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar
}
}
- final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer);
- minTimestamp = Math.min(minTimestamp, updater.minTimestamp);
- liveDataSize.addAndGet(initialSize + updater.dataSize);
- currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
- return updater.colUpdateTimeDelta;
- }
-
- // for debugging
- public String contents()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
- {
- builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
- }
- builder.append("}");
- return builder.toString();
+ long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
++ minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
+ liveDataSize.addAndGet(initialSize + pair[0]);
+ columnsCollector.update(update.columns());
+ statsCollector.update(update.stats());
+ currentOperations.addAndGet(update.operationCount());
+ return pair[1];
}
public int partitionCount()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 259e1b9,00d1344..5cb60c5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact
import java.util.*;
++import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import com.google.common.collect.Iterables;
+
++import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple
else if (sstable.getBloomFilter().isPresent(key))
min = Math.min(min, sstable.getMinTimestamp());
}
+
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ {
- ColumnFamily cf = memtable.getColumnFamily(key);
- if (cf != null)
- min = Math.min(min, memtable.getMinTimestamp());
++ Partition partition = memtable.getPartition(key);
++ if (partition != null)
++ min = Math.min(min, partition.stats().minTimestamp);
+ }
return min;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 0000000,750a38e..e781716
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -1,0 -1,191 +1,195 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.compaction;
+
+ import java.nio.ByteBuffer;
+ import java.util.Set;
+
+ import com.google.common.collect.Sets;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.Util;
++import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.composites.CellName;
++import org.apache.cassandra.db.RowUpdateBuilder;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+
-import static org.apache.cassandra.Util.cellname;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+
+ public class CompactionControllerTest extends SchemaLoader
+ {
+ private static final String KEYSPACE = "CompactionControllerTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF1),
- SchemaLoader.standardCFMD(KEYSPACE, CF2));
++ KeyspaceParams.simple(1),
++ CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build(),
++ CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false)
++ .addPartitionKey("pk", AsciiType.instance)
++ .addClusteringColumn("ck", AsciiType.instance)
++ .addRegularColumn("val", AsciiType.instance)
++ .build());
+ }
+
+ @Test
+ public void testMaxPurgeableTimestamp()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey);
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // add to first memtable
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ // check max purgeable timestamp without any sstables
+ try(CompactionController controller = new CompactionController(cfs, null, 0))
+ {
+ assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+
+ cfs.forceBlockingFlush();
+ assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
+ }
+
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
+
+ // create another sstable
- applyMutation(CF1, rowKey, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // check max purgeable timestamp when compacting the first sstable with and without a memtable
+ try (CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ }
+
+ // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
+ cfs.forceBlockingFlush();
+
+ //newest to oldest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp1);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp3);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+
+ cfs.forceBlockingFlush();
+
+ //oldest to newest
+ try (CompactionController controller = new CompactionController(cfs, null, 0))
+ {
- applyMutation(CF1, rowKey, timestamp3);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp2);
++ applyMutation(cfs.metadata, key, timestamp1);
+
+ assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ }
+ }
+
+ @Test
+ public void testGetFullyExpiredSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+
- ByteBuffer rowKey = ByteBufferUtil.bytes("k1");
++ DecoratedKey key = Util.dk("k1");
+
+ long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp
+ long timestamp2 = timestamp1 - 5;
+ long timestamp3 = timestamp2 - 5; // oldest timestamp
+
+ // create sstable with tombstone that should be expired in no older timestamps
- applyDeleteMutation(CF2, rowKey, timestamp2);
++ applyDeleteMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+
+ // first sstable with tombstone is compacting
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables());
++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables());
+
+ // create another sstable with more recent timestamp
- applyMutation(CF2, rowKey, timestamp1);
++ applyMutation(cfs.metadata, key, timestamp1);
+ cfs.forceBlockingFlush();
+
+ // second sstable is overlapping
- Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting);
++ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting);
+
+ // the first sstable should be expired because the overlapping sstable is newer and the gc period is later
+ int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5;
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(1, expired.size());
+ assertEquals(compacting.iterator().next(), expired.iterator().next());
+
+ // however if we add an older mutation to the memtable then the sstable should not be expired
- applyMutation(CF2, rowKey, timestamp3);
++ applyMutation(cfs.metadata, key, timestamp3);
+ expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore);
+ assertNotNull(expired);
+ assertEquals(0, expired.size());
+ }
+
- private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- CellName colName = cellname("birthdate");
+ ByteBuffer val = ByteBufferUtil.bytes(1L);
+
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.add(cf, colName, val, timestamp);
- rm.applyUnsafe();
++ new RowUpdateBuilder(cfm, timestamp, key)
++ .clustering("ck")
++ .add("val", val)
++ .build()
++ .applyUnsafe();
+ }
+
- private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
++ private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
+ {
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.delete(cf, timestamp);
- rm.applyUnsafe();
++ new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
++ .applyUnsafe();
+ }
-
-
-
+ }