You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:58 UTC

[4/6] cassandra git commit: Extend Transactional API to sstable lifecycle management

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
new file mode 100644
index 0000000..acc9747
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static com.google.common.base.Functions.compose;
+import static com.google.common.base.Predicates.*;
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.Iterables.*;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.*;
+import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.concurrent.Refs.release;
+import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
+
+public class LifecycleTransaction extends Transactional.AbstractTransactional
+{
+    private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
+
+    /**
+     * a class that represents accumulated modifications to the Tracker.
+     * has two instances, one containing modifications that are "staged" (i.e. invisible)
+     * and one containing those "logged" that have been made visible through a call to checkpoint()
+     */
+    private static class State
+    {
+        // readers that are either brand new, update a previous new reader, or update one of the original readers
+        final Set<SSTableReader> update = new HashSet<>();
+        // disjoint from update, represents a subset of originals that is no longer needed
+        final Set<SSTableReader> obsolete = new HashSet<>();
+
+        void log(State staged)
+        {
+            update.removeAll(staged.obsolete);
+            update.removeAll(staged.update);
+            update.addAll(staged.update);
+            obsolete.addAll(staged.obsolete);
+        }
+
+        boolean contains(SSTableReader reader)
+        {
+            return update.contains(reader) || obsolete.contains(reader);
+        }
+
+        boolean isEmpty()
+        {
+            return update.isEmpty() && obsolete.isEmpty();
+        }
+
+        void clear()
+        {
+            update.clear();
+            obsolete.clear();
+        }
+    }
+
+    public final Tracker tracker;
+    private final OperationType operationType;
+    // the original readers this transaction was opened over, and that it guards
+    // (no other transactions may operate over these readers concurrently)
+    private final Set<SSTableReader> originals = new HashSet<>();
+    // the set of readers we've marked as compacting (only updated on creation and in checkpoint())
+    private final Set<SSTableReader> marked = new HashSet<>();
+    // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
+    // same version of a reader. potentially a dangerous property if there are reference counting bugs
+    // as they won't be caught until the transaction's lifespan is over.
+    private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());
+
+    // changes that have been made visible
+    private final State logged = new State();
+    // changes that are pending
+    private final State staged = new State();
+
+    /**
+     * construct a Transaction for use in an offline operation
+     */
+    public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader)
+    {
+        return offline(operationType, singleton(reader));
+    }
+
+    /**
+     * construct a Transaction for use in an offline operation
+     */
+    public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers)
+    {
+        // if offline, for simplicity we just use a dummy tracker
+        Tracker dummy = new Tracker(null, false);
+        dummy.addInitialSSTables(readers);
+        dummy.apply(updateCompacting(emptySet(), readers));
+        return new LifecycleTransaction(dummy, operationType, readers);
+    }
+
+    LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
+    {
+        this.tracker = tracker;
+        this.operationType = operationType;
+        for (SSTableReader reader : readers)
+        {
+            originals.add(reader);
+            marked.add(reader);
+            identities.add(reader.instanceId);
+        }
+    }
+
+    public void doPrepare()
+    {
+        // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
+        // separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint
+        // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
+        // it may break this use case, and care is needed
+        checkpoint();
+    }
+
+    /**
+     * point of no return: commit all changes, but leave all readers marked as compacting
+     */
+    public Throwable doCommit(Throwable accumulate)
+    {
+        assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
+
+        logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
+
+        // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
+        // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
+        // and notification status for the obsolete and new files
+        accumulate = setupDeleteNotification(logged.update, tracker, accumulate);
+        accumulate = markObsolete(logged.obsolete, accumulate);
+        accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
+        accumulate = release(selfRefs(logged.obsolete), accumulate);
+        accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate);
+        return accumulate;
+    }
+
+    /**
+     * undo all of the changes made by this transaction, resetting the state to its original form
+     */
+    public Throwable doAbort(Throwable accumulate)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
+
+        if (logged.isEmpty() && staged.isEmpty())
+            return accumulate;
+
+        // mark obsolete all readers that are not versions of those present in the original set
+        Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
+        logger.debug("Obsoleting {}", obsolete);
+        accumulate = markObsolete(obsolete, accumulate);
+
+        // replace all updated readers with a version restored to its original state
+        accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);
+        // setReplaced immediately preceding versions that have not been obsoleted
+        accumulate = setReplaced(logged.update, accumulate);
+        // we have replaced all of logged.update and never made visible staged.update,
+        // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
+        // any _staged_ obsoletes should either be in staged.update already, and dealt with there,
+        // or is still in its original form (so left as is); in either case no extra action is needed
+        accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
+        logged.clear();
+        staged.clear();
+        return accumulate;
+    }
+
+    @Override
+    protected Throwable doPostCleanup(Throwable accumulate)
+    {
+        return unmarkCompacting(marked, accumulate);
+    }
+
+    public void permitRedundantTransitions()
+    {
+        super.permitRedundantTransitions();
+    }
+
+    /**
+     * call when a consistent batch of changes is ready to be made atomically visible
+     * these will be exposed in the Tracker atomically, or an exception will be thrown; in this case
+     * the transaction should be rolled back
+     */
+    public void checkpoint()
+    {
+        maybeFail(checkpoint(null));
+    }
+    private Throwable checkpoint(Throwable accumulate)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete);
+
+        if (staged.isEmpty())
+            return accumulate;
+
+        Set<SSTableReader> toUpdate = toUpdate();
+        Set<SSTableReader> fresh = copyOf(fresh());
+
+        // check the current versions of the readers we're replacing haven't somehow been replaced by someone else
+        checkNotReplaced(filterIn(toUpdate, staged.update));
+
+        // ensure any new readers are in the compacting set, since we aren't done with them yet
+        // and don't want anyone else messing with them
+        // apply atomically along with updating the live set of readers
+        tracker.apply(compose(updateCompacting(emptySet(), fresh),
+                              updateLiveSet(toUpdate, staged.update)));
+
+        // log the staged changes and our newly marked readers
+        marked.addAll(fresh);
+        logged.log(staged);
+
+        // setup our tracker, and mark our prior versions replaced, also releasing our references to them
+        // we do not replace/release obsoleted readers, since we may need to restore them on rollback
+        accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate);
+        accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate);
+
+        staged.clear();
+        return accumulate;
+    }
+
+    /**
+     * update a reader: if !original, this is a reader that is being introduced by this transaction;
+     * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
+     */
+    public void update(SSTableReader reader, boolean original)
+    {
+        assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader;
+        assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader;
+        // check it isn't obsolete, and that it matches the original flag
+        assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted";
+        assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader);
+        staged.update.add(reader);
+        identities.add(reader.instanceId);
+        reader.setupKeyCache();
+    }
+
+    /**
+     * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called,
+     * but on checkpoint() the reader will be removed from the live set
+     */
+    public void obsolete(SSTableReader reader)
+    {
+        logger.debug("Staging for obsolescence {}", reader);
+        // check this is: a reader guarded by the transaction, an instance we have already worked with
+        // and that we haven't already obsoleted it, nor do we have other changes staged for it
+        assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader;
+        assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals;
+        assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader;
+        assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader;
+        assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader;
+        staged.obsolete.add(reader);
+    }
+
+    /**
+     * obsolete every file in the original transaction
+     */
+    public void obsoleteOriginals()
+    {
+        logger.debug("Staging for obsolescence {}", originals);
+        // if we're obsoleting, we should have no staged updates for the original files
+        assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update;
+
+        // stage obsoletes for any currently visible versions of any original readers
+        Iterables.addAll(staged.obsolete, filterIn(current(), originals));
+    }
+
+    /**
+     * return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged
+     */
+    private Set<SSTableReader> toUpdate()
+    {
+        return copyOf(filterIn(current(), staged.obsolete, staged.update));
+    }
+
+    /**
+     * new readers that haven't appeared previously (either in the original set or the logged updates)
+     */
+    private Iterable<SSTableReader> fresh()
+    {
+        return filterOut(staged.update,
+                         originals, logged.update);
+    }
+
+    /**
+     * returns the currently visible readers managed by this transaction
+     */
+    public Iterable<SSTableReader> current()
+    {
+        // i.e., those that are updates that have been logged (made visible),
+        // and any original readers that have neither been obsoleted nor updated
+        return concat(logged.update, filterOut(originals, logged.update, logged.obsolete));
+    }
+
+    /**
+     * update the current replacement of any original reader back to its original start
+     */
+    private List<SSTableReader> restoreUpdatedOriginals()
+    {
+        Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
+        return ImmutableList.copyOf(transform(torestore,
+                                              new Function<SSTableReader, SSTableReader>()
+                                              {
+                                                  public SSTableReader apply(SSTableReader reader)
+                                                  {
+                                                      return current(reader).cloneWithNewStart(reader.first, null);
+                                                  }
+                                              }));
+    }
+
+    /**
+     * the set of readers guarded by this transaction _in their original instance/state_
+     * call current(SSTableReader) on any reader in this set to get the latest instance
+     */
+    public Set<SSTableReader> originals()
+    {
+        return Collections.unmodifiableSet(originals);
+    }
+
+    /**
+     * indicates if the reader has been marked for obsoletion
+     */
+    public boolean isObsolete(SSTableReader reader)
+    {
+        return logged.obsolete.contains(reader) || staged.obsolete.contains(reader);
+    }
+
+    /**
+     * return the current version of the provided reader, whether or not it is visible or staged;
+     * i.e. returns the first version present by testing staged, logged and originals in order.
+     */
+    public SSTableReader current(SSTableReader reader)
+    {
+        Set<SSTableReader> container;
+        if (staged.contains(reader))
+            container = staged.update.contains(reader) ? staged.update : staged.obsolete;
+        else if (logged.contains(reader))
+            container = logged.update.contains(reader) ? logged.update : logged.obsolete;
+        else if (originals.contains(reader))
+            container = originals;
+        else throw new AssertionError();
+        return select(reader, container);
+    }
+
+    /**
+     * remove the reader from the set we're modifying
+     */
+    public void cancel(SSTableReader cancel)
+    {
+        logger.debug("Cancelling {} from transaction", cancel);
+        assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals;
+        assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel;
+        originals.remove(cancel);
+        marked.remove(cancel);
+        maybeFail(unmarkCompacting(singleton(cancel), null));
+    }
+
+    /**
+     * remove the readers from the set we're modifying
+     */
+    public void cancel(Iterable<SSTableReader> cancels)
+    {
+        for (SSTableReader cancel : cancels)
+            cancel(cancel);
+    }
+
+    /**
+     * remove the provided readers from this Transaction, and return a new Transaction to manage them
+     * only permitted to be called if the current Transaction has never been used
+     */
+    public LifecycleTransaction split(Collection<SSTableReader> readers)
+    {
+        logger.debug("Splitting {} into new transaction", readers);
+        checkUnused();
+        for (SSTableReader reader : readers)
+            assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader;
+
+        for (SSTableReader reader : readers)
+        {
+            identities.remove(reader.instanceId);
+            originals.remove(reader);
+            marked.remove(reader);
+        }
+        return new LifecycleTransaction(tracker, operationType, readers);
+    }
+
+    /**
+     * check this transaction has never been used
+     */
+    private void checkUnused()
+    {
+        assert logged.isEmpty();
+        assert staged.isEmpty();
+        assert identities.size() == originals.size();
+        assert originals.size() == marked.size();
+    }
+
+    private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate)
+    {
+        accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate);
+        // when the CFS is invalidated, it will call unreferenceSSTables().  However, unreferenceSSTables only deals
+        // with sstables that aren't currently being compacted.  If there are ongoing compactions that finish or are
+        // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
+        accumulate = tracker.dropSSTablesIfInvalid(accumulate);
+        return accumulate;
+    }
+
+    // convenience method for callers that know only one sstable is involved in the transaction
+    public SSTableReader onlyOne()
+    {
+        assert originals.size() == 1;
+        return getFirst(originals, null);
+    }
+
+    // a class representing the current state of the reader within this transaction, encoding the actions both logged
+    // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
+    // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
+    // by the corresponding Action)
+    @VisibleForTesting
+    public static class ReaderState
+    {
+        public static enum Action
+        {
+            UPDATED, OBSOLETED, NONE;
+            public static Action get(boolean updated, boolean obsoleted)
+            {
+                assert !(updated && obsoleted);
+                return updated ? UPDATED : obsoleted ? OBSOLETED : NONE;
+            }
+        }
+
+        final Action staged;
+        final Action logged;
+        final SSTableReader nextVisible;
+        final SSTableReader currentlyVisible;
+        final boolean original;
+
+        public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original)
+        {
+            this.staged = staged;
+            this.logged = logged;
+            this.currentlyVisible = currentlyVisible;
+            this.nextVisible = nextVisible;
+            this.original = original;
+        }
+
+        public boolean equals(Object that)
+        {
+            return that instanceof ReaderState && equals((ReaderState) that);
+        }
+
+        public boolean equals(ReaderState that)
+        {
+            return this.staged == that.staged && this.logged == that.logged && this.original == that.original
+                && this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible;
+        }
+
+        public String toString()
+        {
+            return String.format("[logged=%s staged=%s original=%s]", logged, staged, original);
+        }
+
+        public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom)
+        {
+            return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom);
+        }
+    }
+
+    @VisibleForTesting
+    public ReaderState state(SSTableReader reader)
+    {
+        SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals);
+        SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals);
+        return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)),
+                               ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)),
+                               currentlyVisible, nextVisible, originals.contains(reader)
+        );
+    }
+
+    public String toString()
+    {
+        return originals.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
new file mode 100644
index 0000000..ff2abcb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
+{
+    private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
+
+    SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
+    {
+        super(intervals);
+    }
+
+    public static SSTableIntervalTree empty()
+    {
+        return EMPTY;
+    }
+
+    public static SSTableIntervalTree build(Iterable<SSTableReader> sstables)
+    {
+        return new SSTableIntervalTree(buildIntervals(sstables));
+    }
+
+    public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
+    {
+        List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
+        for (SSTableReader sstable : sstables)
+            intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
+        return intervals;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
new file mode 100644
index 0000000..50f567f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static com.google.common.base.Predicates.and;
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static com.google.common.collect.Iterables.filter;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.*;
+import static org.apache.cassandra.db.lifecycle.View.permitCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
+import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+import static org.apache.cassandra.utils.concurrent.Refs.release;
+import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
+
+public class Tracker
+{
+    private static final Logger logger = LoggerFactory.getLogger(Tracker.class);
+
+    public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
+    public final ColumnFamilyStore cfstore;
+    final AtomicReference<View> view;
+    public final boolean loadsstables;
+
+    public Tracker(ColumnFamilyStore cfstore, boolean loadsstables)
+    {
+        this.cfstore = cfstore;
+        this.view = new AtomicReference<>();
+        this.loadsstables = loadsstables;
+        this.reset();
+    }
+
+    public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType)
+    {
+        return tryModify(singleton(sstable), operationType);
+    }
+
+    /**
+     * @return a Transaction over the provided sstables if we are able to mark the given @param sstables as compacted, before anyone else
+     */
+    public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType)
+    {
+        if (Iterables.isEmpty(sstables))
+            return new LifecycleTransaction(this, operationType, sstables);
+        if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables)))
+            return null;
+        return new LifecycleTransaction(this, operationType, sstables);
+    }
+
+
+    // METHODS FOR ATOMICALLY MODIFYING THE VIEW
+
+    Pair<View, View> apply(Function<View, View> function)
+    {
+        return apply(Predicates.<View>alwaysTrue(), function);
+    }
+
+    Throwable apply(Function<View, View> function, Throwable accumulate)
+    {
+        try
+        {
+            apply(function);
+        }
+        catch (Throwable t)
+        {
+            accumulate = merge(accumulate, t);
+        }
+        return accumulate;
+    }
+
+    /**
+     * atomically tests permit against the view and applies function to it, if permit yields true, returning the original;
+     * otherwise the method aborts, returning null
+     */
+    Pair<View, View> apply(Predicate<View> permit, Function<View, View> function)
+    {
+        while (true)
+        {
+            View cur = view.get();
+            if (!permit.apply(cur))
+                return null;
+            View updated = function.apply(cur);
+            if (view.compareAndSet(cur, updated))
+                return Pair.create(cur, updated);
+        }
+    }
+
+    Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate)
+    {
+        if (cfstore == null)
+            return accumulate;
+
+        long add = 0;
+        for (SSTableReader sstable : newSSTables)
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
+            try
+            {
+                add += sstable.bytesOnDisk();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        long subtract = 0;
+        for (SSTableReader sstable : oldSSTables)
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
+            try
+            {
+                subtract += sstable.bytesOnDisk();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        StorageMetrics.load.inc(add - subtract);
+        cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
+        // we don't subtract from total until the sstable is deleted
+        cfstore.metric.totalDiskSpaceUsed.inc(add);
+        return accumulate;
+    }
+
+    // SETUP / CLEANUP
+
+    public void addInitialSSTables(Iterable<SSTableReader> sstables)
+    {
+        maybeFail(setupDeleteNotification(sstables, this, null));
+        apply(updateLiveSet(emptySet(), sstables));
+        maybeFail(updateSizeTracking(emptySet(), sstables, null));
+        // no notifications or backup necessary
+    }
+
+    public void addSSTables(Iterable<SSTableReader> sstables)
+    {
+        addInitialSSTables(sstables);
+        for (SSTableReader sstable : sstables)
+        {
+            maybeIncrementallyBackup(sstable);
+            notifyAdded(sstable);
+        }
+    }
+
+    /** (Re)initializes the tracker, purging all references. */
+    @VisibleForTesting
+    public void reset()
+    {
+        view.set(new View(
+                         cfstore != null ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+                         ImmutableList.<Memtable>of(),
+                         Collections.<SSTableReader, SSTableReader>emptyMap(),
+                         Collections.<SSTableReader>emptySet(),
+                         SSTableIntervalTree.empty()));
+    }
+
+    public Throwable dropSSTablesIfInvalid(Throwable accumulate)
+    {
+        if (cfstore != null && !cfstore.isValid())
+            accumulate = dropSSTables(accumulate);
+        return accumulate;
+    }
+
+    public void dropSSTables()
+    {
+        maybeFail(dropSSTables(null));
+    }
+
+    public Throwable dropSSTables(Throwable accumulate)
+    {
+        return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
+    }
+
+    /**
+     * removes all sstables that are not busy compacting.
+     */
+    public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
+    {
+        Pair<View, View> result = apply(new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
+                return updateLiveSet(toremove, emptySet()).apply(view);
+            }
+        });
+
+        Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
+        assert Iterables.all(removed, remove);
+
+        if (!removed.isEmpty())
+        {
+            // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
+            accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate);
+            accumulate = updateSizeTracking(removed, emptySet(), accumulate);
+            accumulate = markObsolete(removed, accumulate);
+            accumulate = release(selfRefs(removed), accumulate);
+        }
+        return accumulate;
+    }
+
+    /**
+     * Removes every SSTable in the directory from the Tracker's view.
+     * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
+     */
+    public void removeUnreadableSSTables(final File directory)
+    {
+        maybeFail(dropSSTables(new Predicate<SSTableReader>()
+        {
+            public boolean apply(SSTableReader reader)
+            {
+                return reader.descriptor.directory.equals(directory);
+            }
+        }, OperationType.UNKNOWN, null));
+    }
+
+
+
+    // FLUSHING
+
+    /**
+     * get the Memtable that the ordered writeOp should be directed to
+     */
+    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
+    {
+        // since any new memtables appended to the list after we fetch it will be for operations started
+        // after us, we can safely assume that we will always find the memtable that 'accepts' us;
+        // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
+
+        // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
+        // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
+        // assign operations to a memtable that was retired/queued before we started)
+        for (Memtable memtable : view.get().liveMemtables)
+        {
+            if (memtable.accepts(opGroup, replayPosition))
+                return memtable;
+        }
+        throw new AssertionError(view.get().liveMemtables.toString());
+    }
+
+    /**
+     * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
+     * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
+     * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
+     * must be followed by discarding(m), they cannot be interleaved.
+     *
+     * @return the previously active memtable
+     */
+    public Memtable switchMemtable(boolean truncating)
+    {
+        Memtable newMemtable = new Memtable(cfstore);
+        Pair<View, View> result = apply(View.switchMemtable(newMemtable));
+        if (truncating)
+            notifyRenewed(newMemtable);
+
+        return result.left.getCurrentMemtable();
+    }
+
+    public void markFlushing(Memtable memtable)
+    {
+        apply(View.markFlushing(memtable));
+    }
+
+    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    {
+        if (sstable == null)
+        {
+            // sstable may be null if we flushed batchlog and nothing needed to be retained
+            // if it's null, we don't care what state the cfstore is in, we just replace it and continue
+            apply(View.replaceFlushed(memtable, null));
+            return;
+        }
+
+        sstable.setupDeleteNotification(this);
+        sstable.setupKeyCache();
+        // back up before creating a new Snapshot (which makes the new one eligible for compaction)
+        maybeIncrementallyBackup(sstable);
+
+        apply(View.replaceFlushed(memtable, sstable));
+
+        Throwable fail;
+        fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+        // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+        fail = notifyAdded(sstable, fail);
+
+        if (cfstore != null && !cfstore.isValid())
+            dropSSTables();
+
+        maybeFail(fail);
+    }
+
+
+
+    // MISCELLANEOUS public utility calls
+
+    public Set<SSTableReader> getSSTables()
+    {
+        return view.get().sstables;
+    }
+
+    public Set<SSTableReader> getCompacting()
+    {
+        return view.get().compacting;
+    }
+
+    public Set<SSTableReader> getUncompacting()
+    {
+        return view.get().nonCompactingSStables();
+    }
+
+    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
+    {
+        return view.get().getUncompacting(candidates);
+    }
+
+    public void maybeIncrementallyBackup(final SSTableReader sstable)
+    {
+        if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
+            return;
+
+        File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+        sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+    }
+
+    public void spaceReclaimed(long size)
+    {
+        if (cfstore != null)
+            cfstore.metric.totalDiskSpaceUsed.dec(size);
+    }
+
+
+
+    // NOTIFICATION
+
+    Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
+    {
+        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
+        for (INotificationConsumer subscriber : subscribers)
+        {
+            try
+            {
+                subscriber.handleNotification(notification, this);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+    {
+        INotification notification = new SSTableAddedNotification(added);
+        for (INotificationConsumer subscriber : subscribers)
+        {
+            try
+            {
+                subscriber.handleNotification(notification, this);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    public void notifyAdded(SSTableReader added)
+    {
+        maybeFail(notifyAdded(added, null));
+    }
+
+    public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
+    {
+        INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+    }
+
+    public void notifyDeleting(SSTableReader deleting)
+    {
+        INotification notification = new SSTableDeletingNotification(deleting);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+    }
+
+    public void notifyRenewed(Memtable renewed)
+    {
+        INotification notification = new MemtableRenewedNotification(renewed);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+    }
+
+    public void notifyTruncated(long truncatedAt)
+    {
+        INotification notification = new TruncationNotification(truncatedAt);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+    }
+
+    public void subscribe(INotificationConsumer consumer)
+    {
+        subscribers.add(consumer);
+    }
+
+    public void unsubscribe(INotificationConsumer consumer)
+    {
+        subscribers.remove(consumer);
+    }
+
+    private static Set<SSTableReader> emptySet()
+    {
+        return Collections.emptySet();
+    }
+
+    public View getView()
+    {
+        return view.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
new file mode 100644
index 0000000..0d1100b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+
+import static com.google.common.base.Predicates.equalTo;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
+import static org.apache.cassandra.db.lifecycle.Helpers.replace;
+
+/**
+ * An immutable structure holding the current memtable, the memtables pending
+ * flush, the sstables for a column family, and the sstables that are active
+ * in compaction (a subset of the sstables).
+ *
+ * Modifications to instances are all performed via a Function produced by the static methods in this class.
+ * These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or
+ * accept and apply the changes to the View.
+ *
+ */
+public class View
+{
+    /**
+     * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
+     * and the new replacement memtable, until all outstanding write operations on the old table complete.
+     * The last item in the list is always the "current" memtable.
+     */
+    public final List<Memtable> liveMemtables;
+    /**
+     * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
+     * flushed. In chronologically ascending order.
+     */
+    public final List<Memtable> flushingMemtables;
+    public final Set<SSTableReader> compacting;
+    public final Set<SSTableReader> sstables;
+    // we use a Map here so that we can easily perform identity checks as well as equality checks.
+    // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
+    // and we then check that not only are they all present in the live set, but that the exact instance present is
+    // the one we made our decision to compact against.
+    public final Map<SSTableReader, SSTableReader> sstablesMap;
+
+    public final SSTableIntervalTree intervalTree;
+
+    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+    {
+        assert liveMemtables != null;
+        assert flushingMemtables != null;
+        assert sstables != null;
+        assert compacting != null;
+        assert intervalTree != null;
+
+        this.liveMemtables = liveMemtables;
+        this.flushingMemtables = flushingMemtables;
+
+        this.sstablesMap = sstables;
+        this.sstables = sstablesMap.keySet();
+        this.compacting = compacting;
+        this.intervalTree = intervalTree;
+    }
+
+    public Memtable getCurrentMemtable()
+    {
+        return liveMemtables.get(liveMemtables.size() - 1);
+    }
+
+    /**
+     * @return the active memtable and all the memtables that are pending flush.
+     */
+    public Iterable<Memtable> getAllMemtables()
+    {
+        return concat(flushingMemtables, liveMemtables);
+    }
+
+    public Sets.SetView<SSTableReader> nonCompactingSStables()
+    {
+        return Sets.difference(sstables, compacting);
+    }
+
+    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
+    {
+        return filter(candidates, new Predicate<SSTableReader>()
+        {
+            public boolean apply(SSTableReader sstable)
+            {
+                return !compacting.contains(sstable);
+            }
+        });
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
+    }
+
+    public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+    {
+        if (intervalTree.isEmpty())
+            return Collections.emptyList();
+        RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
+        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+    }
+
+    // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
+
+    // return a function to un/mark the provided readers compacting in a view
+    static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark)
+    {
+        if (unmark.isEmpty() && Iterables.isEmpty(mark))
+            return Functions.identity();
+        return new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                assert all(mark, Helpers.idIn(view.sstablesMap));
+                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
+                                replace(view.compacting, unmark, mark),
+                                view.intervalTree);
+            }
+        };
+    }
+
+    // construct a predicate to reject views that do not permit us to mark these readers compacting;
+    // i.e. one of them is either already compacting, has been compacted, or has been replaced
+    static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
+    {
+        return new Predicate<View>()
+        {
+            public boolean apply(View view)
+            {
+                for (SSTableReader reader : readers)
+                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
+                        return false;
+                return true;
+            }
+        };
+    }
+
+    // construct a function to change the liveset in a Snapshot
+    static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add)
+    {
+        if (remove.isEmpty() && Iterables.isEmpty(add))
+            return Functions.identity();
+        return new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
+                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting,
+                                SSTableIntervalTree.build(sstableMap.keySet()));
+            }
+        };
+    }
+
+    // called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable
+    static Function<View, View> switchMemtable(final Memtable newMemtable)
+    {
+        return new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
+                assert newLive.size() == view.liveMemtables.size() + 1;
+                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree);
+            }
+        };
+    }
+
+    // called before flush: move toFlush from liveMemtables to flushingMemtables
+    static Function<View, View> markFlushing(final Memtable toFlush)
+    {
+        return new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
+                List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
+                List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
+                                                           of(toFlush),
+                                                           filter(flushing, not(lessThan(toFlush)))));
+                assert newLive.size() == live.size() - 1;
+                assert newFlushing.size() == flushing.size() + 1;
+                return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree);
+            }
+        };
+    }
+
+    // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
+    static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+    {
+        return new Function<View, View>()
+        {
+            public View apply(View view)
+            {
+                List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
+                assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
+
+                if (flushed == null)
+                    return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
+                                    view.compacting, view.intervalTree);
+
+                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+                return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting,
+                                SSTableIntervalTree.build(sstableMap.keySet()));
+            }
+        };
+    }
+
+    private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
+    {
+        return new Predicate<T>()
+        {
+            public boolean apply(T t)
+            {
+                return t.compareTo(lessThan) < 0;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index a6c7a8b..c994a3d 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -403,7 +403,7 @@ public class CompressionMetadata
             count = chunkIndex;
         }
 
-        protected Throwable doCleanup(Throwable failed)
+        protected Throwable doPreCleanup(Throwable failed)
         {
             return offsets.close(failed);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 81e487c..9bfbc99 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -19,24 +19,15 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
+
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,8 +35,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -209,22 +202,25 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
      * Returns a Pair of all compacting and non-compacting sstables.  Non-compacting sstables will be marked as
      * compacting.
      */
-    private Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> getCompactingAndNonCompactingSSTables()
+    private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables()
     {
         List<SSTableReader> allCompacting = new ArrayList<>();
-        Multimap<DataTracker, SSTableReader> allNonCompacting = HashMultimap.create();
+        Map<UUID, LifecycleTransaction> allNonCompacting = new HashMap<>();
         for (Keyspace ks : Keyspace.all())
         {
             for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
             {
                 Set<SSTableReader> nonCompacting, allSSTables;
+                LifecycleTransaction txn = null;
                 do
                 {
-                    allSSTables = cfStore.getDataTracker().getSSTables();
-                    nonCompacting = Sets.newHashSet(cfStore.getDataTracker().getUncompactingSSTables(allSSTables));
+                    View view = cfStore.getTracker().getView();
+                    allSSTables = view.sstables;
+                    nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
                 }
-                while (!(nonCompacting.isEmpty() || cfStore.getDataTracker().markCompacting(nonCompacting)));
-                allNonCompacting.putAll(cfStore.getDataTracker(), nonCompacting);
+                while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
+
+                allNonCompacting.put(cfStore.metadata.cfId, txn);
                 allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
             }
         }
@@ -233,50 +229,57 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
 
     public void redistributeSummaries() throws IOException
     {
-        Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
+        Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
-            redistributeSummaries(compactingAndNonCompacting.left, Lists.newArrayList(compactingAndNonCompacting.right.values()), this.memoryPoolBytes);
+            redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
         }
         finally
         {
-            for(DataTracker tracker : compactingAndNonCompacting.right.keySet())
-                tracker.unmarkCompacting(compactingAndNonCompacting.right.get(tracker));
+            for (LifecycleTransaction modifier : compactingAndNonCompacting.right.values())
+                modifier.close();
         }
     }
 
     /**
      * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
      * their recent read rates.
-     * @param nonCompacting a list of sstables to share the memory pool across
+     * @param transactions containing the sstables we are to redistribute the memory pool across
      * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
      *                        under, if possible
      * @return a list of new SSTableReader instances
      */
     @VisibleForTesting
-    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
+    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
     {
-        long total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
-            total += sstable.getIndexSummaryOffHeapSize();
-
         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
-        for (SSTableReader sstable : nonCompacting)
+        List<SSTableReader> redistribute = new ArrayList<>();
+        for (LifecycleTransaction txn : transactions.values())
         {
-            // We can't change the sampling level of sstables with the old format, because the serialization format
-            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
-            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
-            if (!sstable.descriptor.version.hasSamplingLevel())
-                oldFormatSSTables.add(sstable);
+            for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
+            {
+                // We can't change the sampling level of sstables with the old format, because the serialization format
+                // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
+                logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+                if (!sstable.descriptor.version.hasSamplingLevel())
+                {
+                    oldFormatSSTables.add(sstable);
+                    txn.cancel(sstable);
+                }
+            }
+            redistribute.addAll(txn.originals());
         }
-        nonCompacting.removeAll(oldFormatSSTables);
+
+        long total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
+            total += sstable.getIndexSummaryOffHeapSize();
 
         logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
-                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+                     redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
 
-        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+        final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
         double totalReadsPerSec = 0.0;
-        for (SSTableReader sstable : nonCompacting)
+        for (SSTableReader sstable : redistribute)
         {
             if (sstable.getReadMeter() != null)
             {
@@ -288,7 +291,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
 
         // copy and sort by read rates (ascending)
-        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+        List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
 
         long remainingBytes = memoryPoolBytes;
@@ -297,7 +300,10 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
 
         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
-        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
+
+        for (LifecycleTransaction txn : transactions.values())
+            txn.finish();
 
         total = 0;
         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
@@ -308,7 +314,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         return newSSTables;
     }
 
-    private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+    private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions,
                                                             double totalReadsPerSec, long memoryPoolCapacity) throws IOException
     {
 
@@ -410,26 +416,16 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         toDownsample.addAll(forceResample);
         toDownsample.addAll(toUpsample);
         toDownsample.addAll(forceUpsample);
-        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
-        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
         for (ResampleEntry entry : toDownsample)
         {
             SSTableReader sstable = entry.sstable;
             logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
                          sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
                          entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
-            ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+            ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
             SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
-            DataTracker tracker = cfs.getDataTracker();
-
-            replacedByTracker.put(tracker, sstable);
-            replacementsByTracker.put(tracker, replacement);
-        }
-
-        for (DataTracker tracker : replacedByTracker.keySet())
-        {
-            tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
-            newSSTables.addAll(replacementsByTracker.get(tracker));
+            newSSTables.add(replacement);
+            transactions.get(sstable.metadata.cfId).update(replacement, true);
         }
 
         return newSSTables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index db54557..cc837ba 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -23,15 +23,16 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.DataTracker;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Blocker;
 
 public class SSTableDeletingTask implements Runnable
 {
@@ -42,11 +43,12 @@ public class SSTableDeletingTask implements Runnable
     // Additionally, we need to make sure to delete the data file first, so on restart the others
     // will be recognized as GCable.
     private static final Set<SSTableDeletingTask> failedTasks = new CopyOnWriteArraySet<>();
+    private static final Blocker blocker = new Blocker();
 
     private final SSTableReader referent;
     private final Descriptor desc;
     private final Set<Component> components;
-    private DataTracker tracker;
+    private Tracker tracker;
 
     /**
      * realDescriptor is the actual descriptor for the sstable, the descriptor inside
@@ -70,13 +72,18 @@ public class SSTableDeletingTask implements Runnable
         }
     }
 
-    public void setTracker(DataTracker tracker)
+    public void setTracker(Tracker tracker)
     {
         // the tracker is used only to notify listeners of deletion of the sstable;
         // since deletion of a non-final file is not really deletion of the sstable,
         // we don't want to notify the listeners in this event
-        if (desc.type == Descriptor.Type.FINAL)
-            this.tracker = tracker;
+        assert desc.type == Descriptor.Type.FINAL;
+        this.tracker = tracker;
+    }
+
+    public Tracker getTracker()
+    {
+        return tracker;
     }
 
     public void schedule()
@@ -86,6 +93,7 @@ public class SSTableDeletingTask implements Runnable
 
     public void run()
     {
+        blocker.ask();
         long size = referent.bytesOnDisk();
 
         if (tracker != null)
@@ -119,6 +127,7 @@ public class SSTableDeletingTask implements Runnable
     }
 
     /** for tests */
+    @VisibleForTesting
     public static void waitForDeletions()
     {
         Runnable runnable = new Runnable()
@@ -130,5 +139,11 @@ public class SSTableDeletingTask implements Runnable
 
         FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
     }
+
+    @VisibleForTesting
+    public static void pauseDeletions(boolean stop)
+    {
+        blocker.block(stop);
+    }
 }