You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:33 UTC

[2/7] cassandra git commit: Extend Transactional API to sstable lifecycle management

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
new file mode 100644
index 0000000..d53a830
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -0,0 +1,158 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class HelpersTest
+{
+
+    static Set<Integer> a = set(1, 2, 3);
+    static Set<Integer> b = set(4, 5, 6);
+    static Set<Integer> c = set(7, 8, 9);
+    static Set<Integer> abc = set(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+    // this also tests orIn
+    @Test
+    public void testFilterIn()
+    {
+        check(Helpers.filterIn(abc, a), a);
+        check(Helpers.filterIn(abc, a, c), set(1, 2, 3, 7, 8, 9));
+        check(Helpers.filterIn(a, c), set());
+    }
+
+    // this also tests notIn
+    @Test
+    public void testFilterOut()
+    {
+        check(Helpers.filterOut(abc, a), set(4, 5, 6, 7, 8, 9));
+        check(Helpers.filterOut(abc, b), set(1, 2, 3, 7, 8, 9));
+        check(Helpers.filterOut(a, a), set());
+    }
+
+    @Test
+    public void testConcatUniq()
+    {
+        check(Helpers.concatUniq(a, b, a, c, b, a), abc);
+    }
+
+    @Test
+    public void testIdentityMap()
+    {
+        Integer one = new Integer(1);
+        Integer two = new Integer(2);
+        Integer three = new Integer(3);
+        Map<Integer, Integer> identity = Helpers.identityMap(set(one, two, three));
+        Assert.assertEquals(3, identity.size());
+        Assert.assertSame(one, identity.get(1));
+        Assert.assertSame(two, identity.get(2));
+        Assert.assertSame(three, identity.get(3));
+    }
+
+    @Test
+    public void testReplace()
+    {
+        boolean failure;
+        failure = false;
+        try
+        {
+            Helpers.replace(abc, a, c);
+        }
+        catch (AssertionError e)
+        {
+            failure = true;
+        }
+        Assert.assertTrue(failure);
+
+        failure = false;
+        try
+        {
+            Helpers.replace(a, abc, c);
+        }
+        catch (AssertionError e)
+        {
+            failure = true;
+        }
+        Assert.assertTrue(failure);
+
+        failure = false;
+        try
+        {
+            Map<Integer, Integer> notIdentity = ImmutableMap.of(1, new Integer(1), 2, 2, 3, 3);
+            Helpers.replace(notIdentity, a, b);
+        }
+        catch (AssertionError e)
+        {
+            failure = true;
+        }
+        Assert.assertTrue(failure);
+
+        // check it actually works when correct values provided
+        check(Helpers.replace(a, a, b), b);
+    }
+
+    private static Set<Integer> set(Integer ... contents)
+    {
+        return ImmutableSet.copyOf(contents);
+    }
+
+    private static void check(Iterable<Integer> check, Set<Integer> expected)
+    {
+        Assert.assertEquals(expected, ImmutableSet.copyOf(check));
+    }
+
+    @Test
+    public void testSetupDeletionNotification()
+    {
+        Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2));
+        Throwable accumulate = Helpers.setReplaced(readers, null);
+        Assert.assertNull(accumulate);
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isReplaced());
+        accumulate = Helpers.setReplaced(readers, null);
+        Assert.assertNotNull(accumulate);
+    }
+
+    @Test
+    public void testMarkObsolete()
+    {
+        Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2));
+        Throwable accumulate = Helpers.markObsolete(readers, null);
+        Assert.assertNull(accumulate);
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isMarkedCompacted());
+        accumulate = Helpers.markObsolete(readers, null);
+        Assert.assertNotNull(accumulate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
new file mode 100644
index 0000000..3153ef1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -0,0 +1,412 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+import org.apache.cassandra.utils.concurrent.Transactional.AbstractTransactional.State;
+
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.size;
+import static org.apache.cassandra.db.lifecycle.Helpers.idIn;
+import static org.apache.cassandra.db.lifecycle.Helpers.orIn;
+import static org.apache.cassandra.db.lifecycle.Helpers.select;
+
+public class LifecycleTransactionTest extends AbstractTransactionalTest
+{
+    private boolean incrementalBackups;
+
+    @Before
+    public void disableIncrementalBackup()
+    {
+        incrementalBackups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+    }
+    @After
+    public void restoreIncrementalBackup()
+    {
+        DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackups);
+    }
+
+    @Test
+    public void testUpdates() // (including obsoletion)
+    {
+        Tracker tracker = new Tracker(null, false);
+        SSTableReader[] readers = readersArray(0, 3);
+        SSTableReader[] readers2 = readersArray(0, 4);
+        SSTableReader[] readers3 = readersArray(0, 4);
+        tracker.addInitialSSTables(copyOf(readers));
+        LifecycleTransaction txn = tracker.tryModify(copyOf(readers), OperationType.UNKNOWN);
+
+        txn.update(readers2[0], true);
+        txn.obsolete(readers[1]);
+
+        Assert.assertTrue(txn.isObsolete(readers[1]));
+        Assert.assertFalse(txn.isObsolete(readers[0]));
+
+        testBadUpdate(txn, readers2[0], true);  // same reader && instances
+        testBadUpdate(txn, readers2[1], true);  // staged obsolete; cannot update
+        testBadUpdate(txn, readers3[0], true);  // same reader, diff instances
+        testBadUpdate(txn, readers2[2], false); // incorrectly declared original status
+        testBadUpdate(txn, readers2[3], true); // incorrectly declared original status
+
+        testBadObsolete(txn, readers[1]);  // staged obsolete; cannot obsolete again
+        testBadObsolete(txn, readers2[0]);  // staged update; cannot obsolete
+
+        txn.update(readers2[3], false);
+
+        Assert.assertEquals(3, tracker.getView().compacting.size());
+        txn.checkpoint();
+        Assert.assertTrue(txn.isObsolete(readers[1]));
+        Assert.assertFalse(txn.isObsolete(readers[0]));
+        Assert.assertEquals(4, tracker.getView().compacting.size());
+        Assert.assertEquals(3, tracker.getView().sstables.size());
+        Assert.assertEquals(3, size(txn.current()));
+        Assert.assertTrue(all(of(readers2[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
+        Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));
+
+        testBadObsolete(txn, readers[1]);  // logged obsolete; cannot obsolete again
+        testBadObsolete(txn, readers2[2]);  // never seen instance; cannot obsolete
+        testBadObsolete(txn, readers2[3]);  // non-original; cannot obsolete
+        testBadUpdate(txn, readers3[1], true);  // logged obsolete; cannot update
+        testBadUpdate(txn, readers2[0], true);  // same instance as logged update
+
+        txn.update(readers3[0], true);  // same reader as logged update, different instance
+        txn.checkpoint();
+
+        Assert.assertEquals(4, tracker.getView().compacting.size());
+        Assert.assertEquals(3, tracker.getView().sstables.size());
+        Assert.assertEquals(3, size(txn.current()));
+        Assert.assertTrue(all(of(readers3[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap)));
+        Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap)));
+
+        testBadObsolete(txn, readers2[0]); // not current version of sstable
+
+        txn.obsoleteOriginals();
+        txn.checkpoint();
+        Assert.assertEquals(1, tracker.getView().sstables.size());
+        txn.obsoleteOriginals(); // should be no-op
+        txn.checkpoint();
+        Assert.assertEquals(1, tracker.getView().sstables.size());
+        Assert.assertEquals(4, tracker.getView().compacting.size());
+    }
+
+    @Test
+    public void testCancellation()
+    {
+        Tracker tracker = new Tracker(null, false);
+        List<SSTableReader> readers = readers(0, 3);
+        tracker.addInitialSSTables(readers);
+        LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);
+
+        SSTableReader cancel = readers.get(0);
+        SSTableReader update = readers(1, 2).get(0);
+        SSTableReader fresh = readers(3, 4).get(0);
+        SSTableReader notPresent = readers(4, 5).get(0);
+
+        txn.cancel(cancel);
+        txn.update(update, true);
+        txn.update(fresh, false);
+
+        testBadCancel(txn, cancel);
+        testBadCancel(txn, update);
+        testBadCancel(txn, fresh);
+        testBadCancel(txn, notPresent);
+        Assert.assertEquals(2, txn.originals().size());
+        Assert.assertEquals(2, tracker.getView().compacting.size());
+        Assert.assertTrue(all(readers.subList(1, 3), idIn(tracker.getView().compacting)));
+
+        txn.checkpoint();
+
+        testBadCancel(txn, cancel);
+        testBadCancel(txn, update);
+        testBadCancel(txn, fresh);
+        testBadCancel(txn, notPresent);
+        Assert.assertEquals(2, txn.originals().size());
+        Assert.assertEquals(3, tracker.getView().compacting.size());
+        Assert.assertEquals(3, size(txn.current()));
+        Assert.assertTrue(all(concat(readers.subList(1, 3), of(fresh)), idIn(tracker.getView().compacting)));
+
+        txn.cancel(readers.get(2));
+        Assert.assertEquals(1, txn.originals().size());
+        Assert.assertEquals(2, tracker.getView().compacting.size());
+        Assert.assertEquals(2, size(txn.current()));
+        Assert.assertTrue(all(of(readers.get(1), fresh), idIn(tracker.getView().compacting)));
+    }
+
+    @Test
+    public void testSplit()
+    {
+        Tracker tracker = new Tracker(null, false);
+        List<SSTableReader> readers = readers(0, 3);
+        tracker.addInitialSSTables(readers);
+        LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN);
+        LifecycleTransaction txn2 = txn.split(readers.subList(0, 1));
+        Assert.assertEquals(2, txn.originals().size());
+        Assert.assertTrue(all(readers.subList(1, 3), in(txn.originals())));
+        Assert.assertEquals(1, txn2.originals().size());
+        Assert.assertTrue(all(readers.subList(0, 1), in(txn2.originals())));
+        txn.update(readers(1, 2).get(0), true);
+        boolean failed = false;
+        try
+        {
+            txn.split(readers.subList(2, 3));
+        }
+        catch (Throwable t)
+        {
+            failed = true;
+        }
+        Assert.assertTrue(failed);
+    }
+
+    private static void testBadUpdate(LifecycleTransaction txn, SSTableReader update, boolean original)
+    {
+        boolean failed = false;
+        try
+        {
+            txn.update(update, original);
+        }
+        catch (Throwable t)
+        {
+            failed = true;
+        }
+        Assert.assertTrue(failed);
+    }
+
+    private static void testBadObsolete(LifecycleTransaction txn, SSTableReader update)
+    {
+        boolean failed = false;
+        try
+        {
+            txn.obsolete(update);
+        }
+        catch (Throwable t)
+        {
+            failed = true;
+        }
+        Assert.assertTrue(failed);
+    }
+
+    private static void testBadCancel(LifecycleTransaction txn, SSTableReader cancel)
+    {
+        boolean failed = false;
+        try
+        {
+            txn.cancel(cancel);
+        }
+        catch (Throwable t)
+        {
+            failed = true;
+        }
+        Assert.assertTrue(failed);
+    }
+
+    protected TestableTransaction newTest()
+    {
+        SSTableDeletingTask.waitForDeletions();
+        SSTableReader.resetTidying();
+        return new TxnTest();
+    }
+
+    private static final class TxnTest extends TestableTransaction
+    {
+        final List<SSTableReader> originals;
+        final List<SSTableReader> untouchedOriginals;
+        final List<SSTableReader> loggedUpdate;
+        final List<SSTableReader> loggedObsolete;
+        final List<SSTableReader> stagedObsolete;
+        final List<SSTableReader> loggedNew;
+        final List<SSTableReader> stagedNew;
+        final Tracker tracker;
+        final LifecycleTransaction txn;
+
+        private static Tracker tracker(List<SSTableReader> readers)
+        {
+            Tracker tracker = new Tracker(MockSchema.cfs, false);
+            tracker.addInitialSSTables(readers);
+            return tracker;
+        }
+
+        private TxnTest()
+        {
+            this(readers(0, 8));
+        }
+
+        private TxnTest(List<SSTableReader> readers)
+        {
+            this(tracker(readers), readers);
+        }
+
+        private TxnTest(Tracker tracker, List<SSTableReader> readers)
+        {
+            this(tracker, readers, tracker.tryModify(readers, OperationType.UNKNOWN));
+        }
+
+        private TxnTest(Tracker tracker, List<SSTableReader> readers, LifecycleTransaction txn)
+        {
+            super(txn);
+            this.tracker = tracker;
+            this.originals = readers;
+            this.txn = txn;
+            update(txn, loggedUpdate = readers(0, 2), true);
+            obsolete(txn, loggedObsolete = readers.subList(2, 4));
+            update(txn, loggedNew = readers(8, 10), false);
+            txn.checkpoint();
+            update(txn, stagedNew = readers(10, 12), false);
+            obsolete(txn, stagedObsolete = copyOf(concat(loggedUpdate, originals.subList(4, 6))));
+            untouchedOriginals = originals.subList(6, 8);
+        }
+
+        private ReaderState state(SSTableReader reader, State state)
+        {
+            SSTableReader original = select(reader, originals);
+            boolean isOriginal = original != null;
+
+            switch (state)
+            {
+                case ABORTED:
+                {
+                    return new ReaderState(Action.NONE, Action.NONE, original, original, isOriginal);
+                }
+
+                case READY_TO_COMMIT:
+                {
+                    ReaderState prev = state(reader, State.IN_PROGRESS);
+                    Action logged;
+                    SSTableReader visible;
+                    if (prev.staged == Action.NONE)
+                    {
+                        logged = prev.logged;
+                        visible = prev.currentlyVisible;
+                    }
+                    else
+                    {
+                        logged = prev.staged;
+                        visible = prev.nextVisible;
+                    }
+                    return new ReaderState(logged, Action.NONE, visible, visible, isOriginal);
+                }
+
+                case IN_PROGRESS:
+                {
+                    Action logged = Action.get(loggedUpdate.contains(reader) || loggedNew.contains(reader), loggedObsolete.contains(reader));
+                    Action staged = Action.get(stagedNew.contains(reader), stagedObsolete.contains(reader));
+                    SSTableReader currentlyVisible = ReaderState.visible(reader, in(loggedObsolete), loggedNew, loggedUpdate, originals);
+                    SSTableReader nextVisible = ReaderState.visible(reader, orIn(stagedObsolete, loggedObsolete), stagedNew, loggedNew, loggedUpdate, originals);
+                    return new ReaderState(logged, staged, currentlyVisible, nextVisible, isOriginal);
+                }
+            }
+            throw new IllegalStateException();
+        }
+
+        private List<Pair<SSTableReader, ReaderState>> states(State state)
+        {
+            List<Pair<SSTableReader, ReaderState>> result = new ArrayList<>();
+            for (SSTableReader reader : concat(originals, loggedNew, stagedNew))
+                result.add(Pair.create(reader, state(reader, state)));
+            return result;
+        }
+
+        protected void doAssert(State state)
+        {
+            for (Pair<SSTableReader, ReaderState> pair : states(state))
+            {
+                SSTableReader reader = pair.left;
+                ReaderState readerState = pair.right;
+
+                Assert.assertEquals(readerState, txn.state(reader));
+                Assert.assertEquals(readerState.currentlyVisible, tracker.getView().sstablesMap.get(reader));
+                if (readerState.currentlyVisible == null && readerState.nextVisible == null && !readerState.original)
+                    Assert.assertTrue(reader.selfRef().globalCount() == 0);
+            }
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            doAssert(State.IN_PROGRESS);
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            doAssert(State.READY_TO_COMMIT);
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            doAssert(State.ABORTED);
+            Assert.assertEquals(0, tracker.getView().compacting.size());
+            Assert.assertEquals(8, tracker.getView().sstables.size());
+            for (SSTableReader reader : concat(loggedNew, stagedNew))
+                Assert.assertTrue(reader.selfRef().globalCount() == 0);
+        }
+
+        protected void assertCommitted() throws Exception
+        {
+            doAssert(State.READY_TO_COMMIT);
+            Assert.assertEquals(0, tracker.getView().compacting.size());
+            Assert.assertEquals(6, tracker.getView().sstables.size());
+            for (SSTableReader reader : concat(loggedObsolete, stagedObsolete))
+                Assert.assertTrue(reader.selfRef().globalCount() == 0);
+        }
+    }
+
+    private static SSTableReader[] readersArray(int lb, int ub)
+    {
+        return readers(lb, ub).toArray(new SSTableReader[0]);
+    }
+
+    private static List<SSTableReader> readers(int lb, int ub)
+    {
+        List<SSTableReader> readers = new ArrayList<>();
+        for (int i = lb ; i < ub ; i++)
+            readers.add(MockSchema.sstable(i, i, true));
+        return copyOf(readers);
+    }
+
+    private static void update(LifecycleTransaction txn, Iterable<SSTableReader> readers, boolean originals)
+    {
+        for (SSTableReader reader : readers)
+            txn.update(reader, originals);
+    }
+
+    private static void obsolete(LifecycleTransaction txn, Iterable<SSTableReader> readers)
+    {
+        for (SSTableReader reader : readers)
+            txn.obsolete(reader);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
new file mode 100644
index 0000000..1eef7b0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -0,0 +1,342 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static java.util.Collections.singleton;
+
+public class TrackerTest
+{
+
+    private static final class MockListener implements INotificationConsumer
+    {
+        final boolean throwException;
+        final List<INotification> received = new ArrayList<>();
+        final List<Object> senders = new ArrayList<>();
+
+        private MockListener(boolean throwException)
+        {
+            this.throwException = throwException;
+        }
+
+        public void handleNotification(INotification notification, Object sender)
+        {
+            if (throwException)
+                throw new RuntimeException();
+            received.add(notification);
+            senders.add(sender);
+        }
+    }
+
+    @Test
+    public void testTryModify()
+    {
+        Tracker tracker = new Tracker(MockSchema.cfs, false);
+        List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0), MockSchema.sstable(1), MockSchema.sstable(2));
+        tracker.addInitialSSTables(copyOf(readers));
+        try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);)
+        {
+            Assert.assertNotNull(txn);
+            Assert.assertNull(tracker.tryModify(readers.get(0), OperationType.COMPACTION));
+            Assert.assertEquals(1, txn.originals().size());
+            Assert.assertTrue(txn.originals().contains(readers.get(0)));
+        }
+        try (LifecycleTransaction txn = tracker.tryModify(Collections.<SSTableReader>emptyList(), OperationType.COMPACTION);)
+        {
+            Assert.assertNotNull(txn);
+            Assert.assertEquals(0, txn.originals().size());
+        }
+    }
+
+    @Test
+    public void testApply()
+    {
+        final Tracker tracker = new Tracker(null, false);
+        final View resultView = ViewTest.fakeView(0, 0);
+        final AtomicInteger count = new AtomicInteger();
+        tracker.apply(new Predicate<View>()
+        {
+            public boolean apply(View view)
+            {
+                // confound the CAS by swapping the view, and check we retry
+                if (count.incrementAndGet() < 3)
+                    tracker.view.set(ViewTest.fakeView(0, 0));
+                return true;
+            }
+        }, new Function<View, View>()
+        {
+            @Nullable
+            public View apply(View view)
+            {
+                return resultView;
+            }
+        });
+        Assert.assertEquals(3, count.get());
+        Assert.assertEquals(resultView, tracker.getView());
+
+        count.set(0);
+        // check that if the predicate returns false, we stop immediately and return null
+        Assert.assertNull(tracker.apply(new Predicate<View>()
+        {
+            public boolean apply(View view)
+            {
+                count.incrementAndGet();
+                return false;
+            }
+        }, null));
+        Assert.assertEquals(1, count.get());
+        Assert.assertEquals(resultView, tracker.getView());
+    }
+
+    @Test
+    public void testAddInitialSSTables()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        Tracker tracker = new Tracker(cfs, false);
+        List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9));
+        tracker.addInitialSSTables(copyOf(readers));
+
+        Assert.assertEquals(3, tracker.view.get().sstables.size());
+
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isDeleteNotificationSetup());
+
+        Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
+    }
+
+    @Test
+    public void testAddSSTables()
+    {
+        boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+        Tracker tracker = new Tracker(MockSchema.cfs, false);
+        MockListener listener = new MockListener(false);
+        tracker.subscribe(listener);
+        List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9));
+        tracker.addSSTables(copyOf(readers));
+
+        Assert.assertEquals(3, tracker.view.get().sstables.size());
+
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isDeleteNotificationSetup());
+
+        Assert.assertEquals(17 + 121 + 9, MockSchema.cfs.metric.liveDiskSpaceUsed.getCount());
+        Assert.assertEquals(3, listener.senders.size());
+        Assert.assertEquals(tracker, listener.senders.get(0));
+        Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification);
+        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+    }
+
+    @Test
+    public void testDropSSTables()
+    {
+        testDropSSTables(false);
+        SSTableDeletingTask.waitForDeletions();
+        testDropSSTables(true);
+        SSTableDeletingTask.waitForDeletions();
+    }
+
+    private void testDropSSTables(boolean invalidate)
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        Tracker tracker = cfs.getTracker();
+        MockListener listener = new MockListener(false);
+        tracker.subscribe(listener);
+        final List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 9, true), MockSchema.sstable(1, 15, true), MockSchema.sstable(2, 71, true));
+        tracker.addInitialSSTables(copyOf(readers));
+        try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);)
+        {
+            SSTableDeletingTask.pauseDeletions(true);
+            if (invalidate)
+                cfs.invalidate(false);
+            else
+                tracker.dropSSTables();
+            Assert.assertEquals(95, cfs.metric.totalDiskSpaceUsed.getCount());
+            Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+            Assert.assertEquals(1, tracker.getView().sstables.size());
+            SSTableDeletingTask.pauseDeletions(false);
+        }
+        if (!invalidate)
+        {
+            Assert.assertEquals(1, tracker.getView().sstables.size());
+            Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
+            Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
+            Assert.assertFalse(readers.get(0).isMarkedCompacted());
+            for (SSTableReader reader : readers.subList(1, 3))
+            {
+                Assert.assertEquals(0, reader.selfRef().globalCount());
+                Assert.assertTrue(reader.isMarkedCompacted());
+            }
+            Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>()
+            {
+                public boolean apply(SSTableReader reader)
+                {
+                    return reader != readers.get(0);
+                }
+            }, OperationType.UNKNOWN, null));
+
+            Assert.assertEquals(1, tracker.getView().sstables.size());
+            Assert.assertEquals(1, listener.received.size());
+            Assert.assertEquals(tracker, listener.senders.get(0));
+            Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
+            Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size());
+            Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
+            readers.get(0).selfRef().release();
+        }
+        else
+        {
+            Assert.assertEquals(0, tracker.getView().sstables.size());
+            Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+            for (SSTableReader reader : readers)
+                Assert.assertTrue(reader.isMarkedCompacted());
+        }
+    }
+
+    @Test
+    public void testMemtableReplacement()
+    {
+        boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        MockListener listener = new MockListener(false);
+        Tracker tracker = cfs.getTracker();
+        tracker.subscribe(listener);
+
+        Memtable prev1 = tracker.switchMemtable(true);
+        OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent();
+        OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier();
+        prev1.setDiscarding(barrier1, new AtomicReference<ReplayPosition>());
+        barrier1.issue();
+        Memtable prev2 = tracker.switchMemtable(false);
+        OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent();
+        OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier();
+        prev2.setDiscarding(barrier2, new AtomicReference<ReplayPosition>());
+        barrier2.issue();
+        Memtable cur = tracker.getView().getCurrentMemtable();
+        OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent();
+        Assert.assertEquals(prev1, tracker.getMemtableFor(write1, ReplayPosition.NONE));
+        Assert.assertEquals(prev2, tracker.getMemtableFor(write2, ReplayPosition.NONE));
+        Assert.assertEquals(cur, tracker.getMemtableFor(writecur, ReplayPosition.NONE));
+        Assert.assertEquals(1, listener.received.size());
+        Assert.assertTrue(listener.received.get(0) instanceof MemtableRenewedNotification);
+        listener.received.clear();
+
+        tracker.markFlushing(prev2);
+        Assert.assertEquals(1, tracker.getView().flushingMemtables.size());
+        Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
+
+        tracker.markFlushing(prev1);
+        Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev1));
+        Assert.assertEquals(2, tracker.getView().flushingMemtables.size());
+
+        tracker.replaceFlushed(prev1, null);
+        Assert.assertEquals(1, tracker.getView().flushingMemtables.size());
+        Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
+
+        SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
+        tracker.replaceFlushed(prev2, reader);
+        Assert.assertEquals(1, tracker.getView().sstables.size());
+        Assert.assertEquals(1, listener.received.size());
+        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        listener.received.clear();
+        Assert.assertTrue(reader.isDeleteNotificationSetup());
+        Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
+
+        // test invalidated CFS
+        cfs = MockSchema.newCFS();
+        tracker = cfs.getTracker();
+        listener = new MockListener(false);
+        tracker.subscribe(listener);
+        prev1 = tracker.switchMemtable(false);
+        tracker.markFlushing(prev1);
+        reader = MockSchema.sstable(0, 10, true, cfs);
+        cfs.invalidate(false);
+        tracker.replaceFlushed(prev1, reader);
+        Assert.assertEquals(0, tracker.getView().sstables.size());
+        Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
+        Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+    }
+
+    @Test
+    public void testNotifications()
+    {
+        SSTableReader r1 = MockSchema.sstable(0), r2 = MockSchema.sstable(1);
+        Tracker tracker = new Tracker(null, false);
+        MockListener listener = new MockListener(false);
+        tracker.subscribe(listener);
+        tracker.notifyAdded(r1);
+        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        listener.received.clear();
+        tracker.notifyDeleting(r1);
+        Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting);
+        listener.received.clear();
+        Assert.assertNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
+        Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
+        Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
+        listener.received.clear();
+        tracker.notifySSTableRepairedStatusChanged(singleton(r1));
+        Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable);
+        listener.received.clear();
+        Memtable memtable = MockSchema.memtable();
+        tracker.notifyRenewed(memtable);
+        Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed);
+        listener.received.clear();
+        tracker.unsubscribe(listener);
+        MockListener failListener = new MockListener(true);
+        tracker.subscribe(failListener);
+        tracker.subscribe(listener);
+        Assert.assertNotNull(tracker.notifyAdded(r1, null));
+        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        listener.received.clear();
+        Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
+        Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
+        Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
+        listener.received.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
new file mode 100644
index 0000000..811e025
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -0,0 +1,202 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.concat;
+import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
+
+public class ViewTest
+{
+
+    @Test
+    public void testSSTablesInBounds()
+    {
+        View initialView = fakeView(0, 5);
+        for (int i = 0 ; i < 5 ; i++)
+        {
+            for (int j = i ; j < 5 ; j++)
+            {
+                RowPosition min = MockSchema.readerBounds(i);
+                RowPosition max = MockSchema.readerBounds(j);
+                for (boolean minInc : new boolean[] { true, false} )
+                {
+                    for (boolean maxInc : new boolean[] { true, false} )
+                    {
+                        if (i == j && !(minInc && maxInc))
+                            continue;
+                        List<SSTableReader> r = initialView.sstablesInBounds(AbstractBounds.bounds(min, minInc, max, maxInc));
+                        Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCompaction()
+    {
+        View initialView = fakeView(0, 5);
+        View cur = initialView;
+        List<SSTableReader> readers = ImmutableList.copyOf(initialView.sstables);
+        Assert.assertTrue(View.permitCompacting(readers).apply(cur));
+        // check we permit compacting duplicates in the predicate, so we don't spin infinitely if there is a screw up
+        Assert.assertTrue(View.permitCompacting(ImmutableList.copyOf(concat(readers, readers))).apply(cur));
+        // check we fail in the application in the presence of duplicates
+        testFailure(View.updateCompacting(emptySet(), concat(readers.subList(0, 1), readers.subList(0, 1))), cur);
+
+        // do lots of trivial checks that the compacting set and related methods behave properly for a simple update
+        cur = View.updateCompacting(emptySet(), readers.subList(0, 2)).apply(cur);
+        Assert.assertTrue(View.permitCompacting(readers.subList(2, 5)).apply(cur));
+        Assert.assertFalse(View.permitCompacting(readers.subList(0, 2)).apply(cur));
+        Assert.assertFalse(View.permitCompacting(readers.subList(0, 1)).apply(cur));
+        Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
+        Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers))));
+        Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size());
+        Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5)));
+        Assert.assertEquals(3, cur.nonCompactingSStables().size());
+
+        // check marking already compacting readers fails with an exception
+        testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur);
+        testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur);
+        testFailure(View.updateCompacting(copyOf(readers.subList(0, 1)), readers.subList(1, 2)), cur);
+
+        // make equivalents of readers.subList(0, 3) that are different instances
+        SSTableReader r0 = MockSchema.sstable(0), r1 = MockSchema.sstable(1), r2 = MockSchema.sstable(2);
+        // attempt to mark compacting a version not in the live set
+        testFailure(View.updateCompacting(emptySet(), of(r2)), cur);
+        // update one compacting, one non-compacting, of the liveset to another instance of the same readers;
+        // confirm liveset changes but compacting does not
+        cur = View.updateLiveSet(copyOf(readers.subList(1, 3)), of(r1, r2)).apply(cur);
+        Assert.assertSame(readers.get(0), cur.sstablesMap.get(r0));
+        Assert.assertSame(r1, cur.sstablesMap.get(r1));
+        Assert.assertSame(r2, cur.sstablesMap.get(r2));
+        testFailure(View.updateCompacting(emptySet(), readers.subList(2, 3)), cur);
+        Assert.assertSame(readers.get(1), Iterables.getFirst(Iterables.filter(cur.compacting, Predicates.equalTo(r1)), null));
+
+        // unmark compacting, and check our methods are all correctly updated
+        cur = View.updateCompacting(copyOf(readers.subList(0, 1)), emptySet()).apply(cur);
+        Assert.assertTrue(View.permitCompacting(concat(readers.subList(0, 1), of(r2), readers.subList(3, 5))).apply(cur));
+        Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
+        testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur);
+        testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur);
+        Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers))));
+        Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size());
+        Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5)));
+        Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(0, 1)));
+        Assert.assertEquals(4, cur.nonCompactingSStables().size());
+    }
+
+    private static void testFailure(Function<View, ?> function, View view)
+    {
+        boolean failed = true;
+        try
+        {
+            function.apply(view);
+            failed = false;
+        }
+        catch (Throwable t)
+        {
+        }
+        Assert.assertTrue(failed);
+    }
+
+    @Test
+    public void testFlushing()
+    {
+        View initialView = fakeView(1, 0);
+        View cur = initialView;
+        Memtable memtable1 = initialView.getCurrentMemtable();
+        Memtable memtable2 = MockSchema.memtable();
+
+        cur = View.switchMemtable(memtable2).apply(cur);
+        Assert.assertEquals(2, cur.liveMemtables.size());
+        Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+        Assert.assertEquals(memtable2, cur.getCurrentMemtable());
+
+        Memtable memtable3 = MockSchema.memtable();
+        cur = View.switchMemtable(memtable3).apply(cur);
+        Assert.assertEquals(3, cur.liveMemtables.size());
+        Assert.assertEquals(0, cur.flushingMemtables.size());
+        Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+        Assert.assertEquals(memtable2, cur.liveMemtables.get(1));
+        Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+        testFailure(View.replaceFlushed(memtable2, null), cur);
+
+        cur = View.markFlushing(memtable2).apply(cur);
+        Assert.assertTrue(cur.flushingMemtables.contains(memtable2));
+        Assert.assertEquals(2, cur.liveMemtables.size());
+        Assert.assertEquals(1, cur.flushingMemtables.size());
+        Assert.assertEquals(memtable2, cur.flushingMemtables.get(0));
+        Assert.assertEquals(memtable1, cur.liveMemtables.get(0));
+        Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+        cur = View.markFlushing(memtable1).apply(cur);
+        Assert.assertEquals(1, cur.liveMemtables.size());
+        Assert.assertEquals(2, cur.flushingMemtables.size());
+        Assert.assertEquals(memtable1, cur.flushingMemtables.get(0));
+        Assert.assertEquals(memtable2, cur.flushingMemtables.get(1));
+        Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+        cur = View.replaceFlushed(memtable2, null).apply(cur);
+        Assert.assertEquals(1, cur.liveMemtables.size());
+        Assert.assertEquals(1, cur.flushingMemtables.size());
+        Assert.assertEquals(memtable1, cur.flushingMemtables.get(0));
+        Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+
+        SSTableReader sstable = MockSchema.sstable(1);
+        cur = View.replaceFlushed(memtable1, sstable).apply(cur);
+        Assert.assertEquals(0, cur.flushingMemtables.size());
+        Assert.assertEquals(1, cur.liveMemtables.size());
+        Assert.assertEquals(memtable3, cur.getCurrentMemtable());
+        Assert.assertEquals(1, cur.sstables.size());
+        Assert.assertEquals(sstable, cur.sstablesMap.get(sstable));
+    }
+
+    static View fakeView(int memtableCount, int sstableCount)
+    {
+        List<Memtable> memtables = new ArrayList<>();
+        List<SSTableReader> sstables = new ArrayList<>();
+        for (int i = 0 ; i < memtableCount ; i++)
+            memtables.add(MockSchema.memtable());
+        for (int i = 0 ; i < sstableCount ; i++)
+            sstables.add(MockSchema.sstable(i));
+        return new View(ImmutableList.copyOf(memtables), Collections.<Memtable>emptyList(), Helpers.identityMap(sstables),
+                        Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index eac6094..518d80e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -41,13 +41,15 @@ import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
+import static com.google.common.collect.ImmutableMap.of;
+import static java.util.Arrays.asList;
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
 import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
@@ -121,12 +123,15 @@ public class IndexSummaryManagerTest
         return total;
     }
 
-    private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
+    private static List<SSTableReader> resetSummaries(ColumnFamilyStore cfs, List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
     {
         for (SSTableReader sstable : sstables)
             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size());
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size());
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
 
@@ -230,7 +235,11 @@ public class IndexSummaryManagerTest
         cfs.metadata.minIndexInterval(originalMinIndexInterval / 2);
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         long summarySpace = sstable.getIndexSummaryOffHeapSize();
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), summarySpace);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace);
+        }
+
         sstable = cfs.getSSTables().iterator().next();
         assertEquals(originalMinIndexInterval, sstable.getEffectiveIndexInterval(), 0.001);
         assertEquals(numRows / originalMinIndexInterval, sstable.getIndexSummarySize());
@@ -238,7 +247,10 @@ public class IndexSummaryManagerTest
         // keep the min_index_interval the same, but now give the summary enough space to grow by 50%
         double previousInterval = sstable.getEffectiveIndexInterval();
         int previousSize = sstable.getIndexSummarySize();
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace * 1.5));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5));
+        }
         sstable = cfs.getSSTables().iterator().next();
         assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1);
         assertEquals(previousInterval * (1.0 / 1.5), sstable.getEffectiveIndexInterval(), 0.001);
@@ -246,7 +258,10 @@ public class IndexSummaryManagerTest
         // return min_index_interval to it's original value (double it), but only give the summary enough space
         // to have an effective index interval of twice the new min
         cfs.metadata.minIndexInterval(originalMinIndexInterval);
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace / 2.0));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0));
+        }
         sstable = cfs.getSSTables().iterator().next();
         assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001);
         assertEquals(numRows / (originalMinIndexInterval * 2), sstable.getIndexSummarySize());
@@ -256,7 +271,10 @@ public class IndexSummaryManagerTest
         // result in an effective interval above the new max)
         cfs.metadata.minIndexInterval(originalMinIndexInterval * 4);
         cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4);
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), 10);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+        }
         sstable = cfs.getSSTables().iterator().next();
         assertEquals(cfs.metadata.getMinIndexInterval(), sstable.getEffectiveIndexInterval(), 0.001);
     }
@@ -276,14 +294,20 @@ public class IndexSummaryManagerTest
         for (SSTableReader sstable : sstables)
             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+        }
         sstables = new ArrayList<>(cfs.getSSTables());
         for (SSTableReader sstable : sstables)
             assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01);
 
         // halve the max_index_interval
         cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() / 2);
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+        }
         sstables = new ArrayList<>(cfs.getSSTables());
         for (SSTableReader sstable : sstables)
         {
@@ -293,7 +317,10 @@ public class IndexSummaryManagerTest
 
         // return max_index_interval to its original value
         cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() * 2);
-        IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+        }
         for (SSTableReader sstable : cfs.getSSTables())
         {
             assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01);
@@ -321,7 +348,10 @@ public class IndexSummaryManagerTest
         long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
 
         // there should be enough space to not downsample anything
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
         assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables));
@@ -329,26 +359,38 @@ public class IndexSummaryManagerTest
 
         // everything should get cut in half
         assert sstables.size() == 4;
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
         validateData(cfs, numRows);
 
         // everything should get cut to a quarter
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4)));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
         validateData(cfs, numRows);
 
         // upsample back up to half
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+        }
         assert sstables.size() == 4;
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
         validateData(cfs, numRows);
 
         // upsample back up to the original index summary
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
         validateData(cfs, numRows);
@@ -357,7 +399,10 @@ public class IndexSummaryManagerTest
         // so the two cold sstables should get downsampled to be half of their original size
         sstables.get(0).overrideReadMeter(new RestorableMeter(50.0, 50.0));
         sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+        }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
@@ -370,7 +415,10 @@ public class IndexSummaryManagerTest
         double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10));
         sstables.get(0).overrideReadMeter(new RestorableMeter(lowerRate, lowerRate));
         sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate));
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+        }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
@@ -379,13 +427,16 @@ public class IndexSummaryManagerTest
         validateData(cfs, numRows);
 
         // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled
-        sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
+        sstables = resetSummaries(cfs, sstables, singleSummaryOffHeapSpace);
         sstables.get(0).overrideReadMeter(new RestorableMeter(1.0, 1.0));
         sstables.get(1).overrideReadMeter(new RestorableMeter(2.0, 2.0));
         sstables.get(2).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
         sstables.get(3).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
 
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50);
+        }
         Collections.sort(sstables, hotnessComparator);
 
         if (sstables.get(0).getIndexSummarySamplingLevel() == minSamplingLevel)
@@ -406,7 +457,10 @@ public class IndexSummaryManagerTest
         sstables.get(1).overrideReadMeter(new RestorableMeter(0.0, 0.0));
         sstables.get(2).overrideReadMeter(new RestorableMeter(92, 92));
         sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+        }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(1, sstables.get(0).getIndexSummarySize());  // at the min sampling level
         assertEquals(1, sstables.get(0).getIndexSummarySize());  // at the min sampling level
@@ -416,7 +470,10 @@ public class IndexSummaryManagerTest
         validateData(cfs, numRows);
 
         // Don't leave enough space for even the minimal index summaries
-        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 10);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+        }
         for (SSTableReader sstable : sstables)
             assertEquals(1, sstable.getIndexSummarySize());  // at the min sampling level
         validateData(cfs, numRows);
@@ -449,19 +506,19 @@ public class IndexSummaryManagerTest
         SSTableReader original = sstables.get(0);
 
         SSTableReader sstable = original;
-        for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            SSTableReader prev = sstable;
-            sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
-            assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
-            int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
-            assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
-            if (prev != original)
-                prev.selfRef().release();
+            for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+            {
+                sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel);
+                assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
+                int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL);
+                assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+                txn.update(sstable, true);
+                txn.checkpoint();
+            }
+            txn.finish();
         }
-
-        // don't leave replaced SSTRs around to break other tests
-        cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 82cb8d5..682d999 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -59,7 +59,9 @@ import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
 import org.apache.cassandra.dht.Range;
@@ -498,8 +500,13 @@ public class SSTableReaderTest
             }));
         }
 
-        SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
-        store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
+        SSTableReader replacement;
+        try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN))
+        {
+            replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
+            txn.update(replacement, true);
+            txn.finish();
+        }
         for (Future future : futures)
             future.get();