You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:47:00 UTC

[6/6] cassandra git commit: Extend Transactional API to sstable lifecycle management

Extend Transactional API to sstable lifecycle management

patch by benedict; reviewed by marcus for CASSANDRA-8568


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5a76bdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5a76bdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5a76bdb

Branch: refs/heads/cassandra-2.2
Commit: e5a76bdb5fc04ffa16b8becaa7877186226c3b32
Parents: 33d71b8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Mar 12 10:23:35 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri May 22 09:44:36 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 249 ++++--
 .../org/apache/cassandra/db/DataTracker.java    | 793 -------------------
 .../cassandra/db/HintedHandOffManager.java      |   2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  15 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  22 +-
 .../compaction/AbstractCompactionStrategy.java  |   7 +-
 .../db/compaction/AbstractCompactionTask.java   |  19 +-
 .../db/compaction/CompactionController.java     |   4 +-
 .../db/compaction/CompactionManager.java        | 182 +++--
 .../cassandra/db/compaction/CompactionTask.java |  54 +-
 .../DateTieredCompactionStrategy.java           |  17 +-
 .../compaction/LeveledCompactionStrategy.java   |  23 +-
 .../db/compaction/LeveledCompactionTask.java    |  11 +-
 .../db/compaction/LeveledManifest.java          |  11 +-
 .../db/compaction/SSTableSplitter.java          |  13 +-
 .../cassandra/db/compaction/Scrubber.java       |  21 +-
 .../SizeTieredCompactionStrategy.java           |  30 +-
 .../cassandra/db/compaction/Upgrader.java       |  15 +-
 .../compaction/WrappingCompactionStrategy.java  |   2 +-
 .../writers/CompactionAwareWriter.java          |  11 +-
 .../writers/DefaultCompactionWriter.java        |  11 +-
 .../writers/MajorLeveledCompactionWriter.java   |  11 +-
 .../writers/MaxSSTableSizeWriter.java           |  10 +-
 .../SplittingSizeTieredCompactionWriter.java    |  14 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   4 +-
 .../db/index/SecondaryIndexManager.java         |   2 +-
 .../apache/cassandra/db/lifecycle/Helpers.java  | 241 ++++++
 .../db/lifecycle/LifecycleTransaction.java      | 511 ++++++++++++
 .../db/lifecycle/SSTableIntervalTree.java       |  40 +
 .../apache/cassandra/db/lifecycle/Tracker.java  | 468 +++++++++++
 .../org/apache/cassandra/db/lifecycle/View.java | 252 ++++++
 .../io/compress/CompressionMetadata.java        |   2 +-
 .../io/sstable/IndexSummaryManager.java         | 106 ++-
 .../io/sstable/SSTableDeletingTask.java         |  27 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 295 ++-----
 .../io/sstable/format/SSTableReader.java        | 100 ++-
 .../io/sstable/format/big/BigTableWriter.java   |   6 +-
 .../cassandra/io/util/SequentialWriter.java     |   2 +-
 .../cassandra/metrics/ColumnFamilyMetrics.java  |  18 +-
 .../cassandra/streaming/StreamSession.java      |   7 +-
 .../cassandra/tools/StandaloneScrubber.java     |  12 +-
 .../cassandra/tools/StandaloneSplitter.java     |   7 +-
 .../cassandra/tools/StandaloneUpgrader.java     |   6 +-
 .../cassandra/utils/concurrent/Blocker.java     |  63 ++
 .../utils/concurrent/Transactional.java         |  31 +-
 .../db/compaction/LongCompactionsTest.java      |  10 +-
 test/unit/org/apache/cassandra/MockSchema.java  | 167 ++++
 test/unit/org/apache/cassandra/Util.java        |  27 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   3 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  58 +-
 .../db/compaction/AntiCompactionTest.java       |  51 +-
 .../compaction/CompactionAwareWriterTest.java   |  45 +-
 .../DateTieredCompactionStrategyTest.java       |   6 +-
 .../cassandra/db/lifecycle/HelpersTest.java     | 158 ++++
 .../db/lifecycle/LifecycleTransactionTest.java  | 412 ++++++++++
 .../cassandra/db/lifecycle/TrackerTest.java     | 342 ++++++++
 .../apache/cassandra/db/lifecycle/ViewTest.java | 202 +++++
 .../io/sstable/IndexSummaryManagerTest.java     | 123 ++-
 .../cassandra/io/sstable/SSTableReaderTest.java |  11 +-
 .../io/sstable/SSTableRewriterTest.java         | 250 +++---
 61 files changed, 3902 insertions(+), 1711 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8b59309..ca87385 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2
+ * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
  * (cqlsh) Add support for native protocol 4 (CASSANDRA-9399)
  * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
  * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4452db2..cc9b26a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -34,6 +34,10 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.lifecycle.Tracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSWriteError;
 import org.json.simple.*;
 import org.slf4j.Logger;
@@ -80,6 +84,8 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import com.clearspring.analytics.stream.Counter;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -149,12 +155,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * Memtables and SSTables on disk for this column family.
      *
-     * We synchronize on the DataTracker to ensure isolation when we want to make sure
+     * We synchronize on the Tracker to ensure isolation when we want to make sure
      * that the memtable we're acting on doesn't change out from under us.  I.e., flush
      * syncronizes on it to make sure it can submit on both executors atomically,
      * so anyone else who wants to make sure flush doesn't interfere should as well.
      */
-    private final DataTracker data;
+    private final Tracker data;
 
     /* The read order, used to track accesses to off-heap memtable storage */
     public final OpOrder readOrdering = new OpOrder();
@@ -288,13 +294,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    private ColumnFamilyStore(Keyspace keyspace,
+    public ColumnFamilyStore(Keyspace keyspace,
+                             String columnFamilyName,
+                             IPartitioner partitioner,
+                             int generation,
+                             CFMetaData metadata,
+                             Directories directories,
+                             boolean loadSSTables)
+    {
+        this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true);
+    }
+
+
+    @VisibleForTesting
+    public ColumnFamilyStore(Keyspace keyspace,
                               String columnFamilyName,
                               IPartitioner partitioner,
                               int generation,
                               CFMetaData metadata,
                               Directories directories,
-                              boolean loadSSTables)
+                              boolean loadSSTables,
+                              boolean registerBookkeeping)
     {
         assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
 
@@ -315,7 +335,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         logger.info("Initializing {}.{}", keyspace.getName(), name);
 
         // scan for sstables corresponding to this cf and load them
-        data = new DataTracker(this, loadSSTables);
+        data = new Tracker(this, loadSSTables);
 
         if (data.loadsstables)
         {
@@ -343,46 +363,59 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 indexManager.addIndexedColumn(info);
         }
 
-        // register the mbean
-        String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
-        mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            ObjectName nameObj = new ObjectName(mbeanName);
-            mbs.registerMBean(this, nameObj);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-        logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
-        latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
+        if (registerBookkeeping)
         {
-            public void run()
+            // register the mbean
+            String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
+            mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
+            try
+            {
+                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+                ObjectName nameObj = new ObjectName(mbeanName);
+                mbs.registerMBean(this, nameObj);
+            }
+            catch (Exception e)
             {
-                SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
-                switch (retryPolicy.type)
+                throw new RuntimeException(e);
+            }
+            logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
+            latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
+            {
+                public void run()
                 {
-                    case PERCENTILE:
-                        // get percentile in nanos
-                        sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d);
-                        break;
-                    case CUSTOM:
-                        // convert to nanos, since configuration is in millisecond
-                        sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d);
-                        break;
-                    default:
-                        sampleLatencyNanos = Long.MAX_VALUE;
-                        break;
+                    SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
+                    switch (retryPolicy.type)
+                    {
+                        case PERCENTILE:
+                            // get percentile in nanos
+                            sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d);
+                            break;
+                        case CUSTOM:
+                            // convert to nanos, since configuration is in millisecond
+                            sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d);
+                            break;
+                        default:
+                            sampleLatencyNanos = Long.MAX_VALUE;
+                            break;
+                    }
                 }
-            }
-        }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
+            }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
+        }
+        else
+        {
+            latencyCalculator = ScheduledExecutors.optionalTasks.schedule(Runnables.doNothing(), 0, TimeUnit.NANOSECONDS);
+            mbeanName = null;
+        }
     }
 
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
     public void invalidate()
     {
+        invalidate(true);
+    }
+
+    public void invalidate(boolean expectMBean)
+    {
         // disable and cancel in-progress compactions before invalidating
         valid = false;
 
@@ -392,21 +425,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
         catch (Exception e)
         {
-            JVMStabilityInspector.inspectThrowable(e);
-            // this shouldn't block anything.
-            logger.warn("Failed unregistering mbean: {}", mbeanName, e);
+            if (expectMBean)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                // this shouldn't block anything.
+                logger.warn("Failed unregistering mbean: {}", mbeanName, e);
+            }
         }
 
         latencyCalculator.cancel(false);
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
-        data.unreferenceSSTables();
+        data.dropSSTables();
         indexManager.invalidate();
 
         invalidateCaches();
     }
 
     /**
-     * Removes every SSTable in the directory from the DataTracker's view.
+     * Removes every SSTable in the directory from the Tracker's view.
      * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
      */
     void maybeRemoveUnreadableSSTables(File directory)
@@ -542,7 +578,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Replacing compacted sstables is atomic as far as observers of DataTracker are concerned, but not on the
+     * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the
      * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then
      * their ancestors are removed.
      *
@@ -826,7 +862,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /*
      * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
      * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
-     * This method does not block except for synchronizing on DataTracker, but the Future it returns will
+     * This method does not block except for synchronizing on Tracker, but the Future it returns will
      * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
      * marked clean up to the position owned by the Memtable.
      */
@@ -849,7 +885,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // reclaiming includes that which we are GC-ing;
         float onHeapRatio = 0, offHeapRatio = 0;
         long onHeapTotal = 0, offHeapTotal = 0;
-        Memtable memtable = getDataTracker().getView().getCurrentMemtable();
+        Memtable memtable = getTracker().getView().getCurrentMemtable();
         onHeapRatio +=  memtable.getAllocator().onHeap().ownershipRatio();
         offHeapRatio += memtable.getAllocator().offHeap().ownershipRatio();
         onHeapTotal += memtable.getAllocator().onHeap().owns();
@@ -859,7 +895,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             if (index.getIndexCfs() != null)
             {
-                MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+                MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
                 onHeapRatio += allocator.onHeap().ownershipRatio();
                 offHeapRatio += allocator.offHeap().ownershipRatio();
                 onHeapTotal += allocator.onHeap().owns();
@@ -984,7 +1020,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the DataTracker monitor.
+     * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the Tracker monitor.
      * In the constructor the current memtable(s) are swapped, and a barrier on outstanding writes is issued;
      * when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed
      * before all memtables are immediately written, and the CL is either immediately marked clean or, if
@@ -1117,7 +1153,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
                 // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
                 // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
-                Memtable current = cfs.getDataTracker().getView().getCurrentMemtable();
+                Memtable current = cfs.getTracker().getView().getCurrentMemtable();
 
                 // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
                 // both on- and off-heap, and select the largest of the two ratios to weight this CF
@@ -1129,7 +1165,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 {
                     if (index.getIndexCfs() != null)
                     {
-                        MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+                        MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
                         onHeap += allocator.onHeap().ownershipRatio();
                         offHeap += allocator.offHeap().ownershipRatio();
                     }
@@ -1278,7 +1314,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (!sstables.iterator().hasNext())
             return ImmutableSet.of();
 
-        DataTracker.SSTableIntervalTree tree = data.getView().intervalTree;
+        SSTableIntervalTree tree = data.getView().intervalTree;
 
         Set<SSTableReader> results = null;
         for (SSTableReader sstable : sstables)
@@ -1454,7 +1490,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         assert !sstables.isEmpty();
-        data.markObsolete(sstables, compactionType);
+        maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
     }
 
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -1473,7 +1509,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * Package protected for access from the CompactionManager.
      */
-    public DataTracker getDataTracker()
+    public Tracker getTracker()
     {
         return data;
     }
@@ -1485,7 +1521,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Set<SSTableReader> getUncompactingSSTables()
     {
-        return data.getUncompactingSSTables();
+        return data.getUncompacting();
     }
 
     public ColumnFamily getColumnFamily(DecoratedKey key,
@@ -1759,7 +1795,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return repairedSSTables;
     }
 
-    public RefViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter)
+    public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter)
     {
         while (true)
         {
@@ -1770,9 +1806,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public ViewFragment select(Function<DataTracker.View, List<SSTableReader>> filter)
+    public ViewFragment select(Function<View, List<SSTableReader>> filter)
     {
-        DataTracker.View view = data.getView();
+        View view = data.getView();
         List<SSTableReader> sstables = view.intervalTree.isEmpty()
                                        ? Collections.<SSTableReader>emptyList()
                                        : filter.apply(view);
@@ -1784,12 +1820,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for the given @param key, according to the interval tree
      */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey key)
+    public Function<View, List<SSTableReader>> viewFilter(final DecoratedKey key)
     {
         assert !key.isMinimum();
-        return new Function<DataTracker.View, List<SSTableReader>>()
+        return new Function<View, List<SSTableReader>>()
         {
-            public List<SSTableReader> apply(DataTracker.View view)
+            public List<SSTableReader> apply(View view)
             {
                 return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key));
             }
@@ -1800,17 +1836,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows within @param rowBounds, inclusive, according to the interval tree.
      */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
+    public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
     {
-        return new Function<DataTracker.View, List<SSTableReader>>()
+        return new Function<View, List<SSTableReader>>()
         {
-            public List<SSTableReader> apply(DataTracker.View view)
+            public List<SSTableReader> apply(View view)
             {
                 return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
             }
         };
     }
 
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may need to be merged
+     * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
+     */
+    public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
+    {
+        return new Function<View, List<SSTableReader>>()
+        {
+            public List<SSTableReader> apply(View view)
+            {
+                Set<SSTableReader> sstables = Sets.newHashSet();
+                for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
+                {
+                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                    {
+                        if (includeRepaired || !sstable.isRepaired())
+                            sstables.add(sstable);
+                    }
+                }
+
+                logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size());
+                return ImmutableList.copyOf(sstables);
+            }
+        };
+    }
+
     public List<String> getSSTablesForKey(String key)
     {
         DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
@@ -2388,6 +2450,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * thread safety.  All we do is wipe the sstable containers clean, while leaving the actual
      * data files present on disk.  (This allows tests to easily call loadNewSSTables on them.)
      */
+    @VisibleForTesting
     public void clearUnsafe()
     {
         for (final ColumnFamilyStore cfs : concatWithIndexes())
@@ -2396,7 +2459,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 public Void call()
                 {
-                    cfs.data.init();
+                    cfs.data.reset();
                     return null;
                 }
             }, true);
@@ -2489,7 +2552,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 // doublecheck that we finished, instead of timing out
                 for (ColumnFamilyStore cfs : selfWithIndexes)
                 {
-                    if (!cfs.getDataTracker().getCompacting().isEmpty())
+                    if (!cfs.getTracker().getCompacting().isEmpty())
                     {
                         logger.warn("Unable to cancel in-progress compactions for {}.  Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName);
                         return null;
@@ -2515,19 +2578,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public Iterable<SSTableReader> markAllCompacting()
+    public LifecycleTransaction markAllCompacting(final OperationType operationType)
     {
-        Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>()
+        Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
         {
-            public Iterable<SSTableReader> call() throws Exception
+            public LifecycleTransaction call() throws Exception
             {
                 assert data.getCompacting().isEmpty() : data.getCompacting();
                 Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
                 if (Iterables.isEmpty(sstables))
-                    return Collections.emptyList();
-                boolean success = data.markCompacting(sstables);
-                assert success : "something marked things compacting while compactions are disabled";
-                return sstables;
+                    return null;
+                LifecycleTransaction modifier = data.tryModify(sstables, operationType);
+                assert modifier != null: "something marked things compacting while compactions are disabled";
+                return modifier;
             }
         };
 
@@ -2634,12 +2697,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getMeanColumns()
     {
-        return data.getMeanColumns();
+        long sum = 0;
+        long count = 0;
+        for (SSTableReader sstable : getSSTables())
+        {
+            long n = sstable.getEstimatedColumnCount().count();
+            sum += sstable.getEstimatedColumnCount().mean() * n;
+            count += n;
+        }
+        return count > 0 ? (int) (sum / count) : 0;
     }
 
     public long estimateKeys()
     {
-        return data.estimatedKeys();
+        long n = 0;
+        for (SSTableReader sstable : getSSTables())
+            n += sstable.estimatedKeys();
+        return n;
     }
 
     /** true if this CFS contains secondary index data */
@@ -2703,18 +2777,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    /**
-     * Returns the creation time of the oldest memtable not fully flushed yet.
-     */
-    public long oldestUnflushedMemtable()
-    {
-        return data.getView().getOldestMemtable().creationTime();
-    }
-
     public boolean isEmpty()
     {
-        DataTracker.View view = data.getView();
-        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
+        View view = data.getView();
+        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
     }
 
     private boolean isRowCacheEnabled()
@@ -2753,7 +2819,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public double getDroppableTombstoneRatio()
     {
-        return getDataTracker().getDroppableTombstoneRatio();
+        double allDroppable = 0;
+        long allColumns = 0;
+        int localTime = (int)(System.currentTimeMillis()/1000);
+
+        for (SSTableReader sstable : getSSTables())
+        {
+            allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
+            allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
+        }
+        return allColumns > 0 ? allDroppable / allColumns : 0;
     }
 
     public long trueSnapshotsSize()
@@ -2770,9 +2845,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially
     // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned
     // (even if it completely replaces it)
-    public static final Function<DataTracker.View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+    public static final Function<View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<View, List<SSTableReader>>()
     {
-        public List<SSTableReader> apply(DataTracker.View view)
+        public List<SSTableReader> apply(View view)
         {
             List<SSTableReader> sstables = new ArrayList<>();
             for (SSTableReader sstable : view.compacting)
@@ -2785,9 +2860,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     };
 
-    public static final Function<DataTracker.View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>()
+    public static final Function<View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<View, List<SSTableReader>>()
     {
-        public List<SSTableReader> apply(DataTracker.View view)
+        public List<SSTableReader> apply(View view)
         {
             List<SSTableReader> sstables = new ArrayList<>();
             for (SSTableReader sstable : CANONICAL_SSTABLES.apply(view))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
deleted file mode 100644
index 36f22c5..0000000
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ /dev/null
@@ -1,793 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.File;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.*;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.notifications.*;
-import org.apache.cassandra.utils.Interval;
-import org.apache.cassandra.utils.IntervalTree;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class DataTracker
-{
-    private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
-
-    public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
-    public final ColumnFamilyStore cfstore;
-    private final AtomicReference<View> view;
-
-    // Indicates if it is safe to load the initial sstables (may not be true when running in
-    //standalone processes meant to repair or upgrade sstables (e.g. standalone scrubber)
-    public final boolean loadsstables;
-
-    public DataTracker(ColumnFamilyStore cfstore, boolean loadsstables)
-    {
-        this.cfstore = cfstore;
-        this.view = new AtomicReference<>();
-        this.loadsstables = loadsstables;
-        this.init();
-    }
-
-    // get the Memtable that the ordered writeOp should be directed to
-    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
-    {
-        // since any new memtables appended to the list after we fetch it will be for operations started
-        // after us, we can safely assume that we will always find the memtable that 'accepts' us;
-        // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
-
-        // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
-        // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
-        // assign operations to a memtable that was retired/queued before we started)
-        for (Memtable memtable : view.get().liveMemtables)
-        {
-            if (memtable.accepts(opGroup, replayPosition))
-                return memtable;
-        }
-        throw new AssertionError(view.get().liveMemtables.toString());
-    }
-
-    public Set<SSTableReader> getSSTables()
-    {
-        return view.get().sstables;
-    }
-
-    public Set<SSTableReader> getUncompactingSSTables()
-    {
-        return view.get().nonCompactingSStables();
-    }
-
-    public Iterable<SSTableReader> getUncompactingSSTables(Iterable<SSTableReader> candidates)
-    {
-        final View v = view.get();
-        return Iterables.filter(candidates, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader sstable)
-            {
-                return !v.compacting.contains(sstable);
-            }
-        });
-    }
-
-    public View getView()
-    {
-        return view.get();
-    }
-
-    /**
-     * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
-     * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
-     * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
-     * must be followed by discarding(m), they cannot be interleaved.
-     *
-     * @return the previously active memtable
-     */
-    public Memtable switchMemtable(boolean truncating)
-    {
-        Memtable newMemtable = new Memtable(cfstore);
-        Memtable toFlushMemtable;
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            toFlushMemtable = currentView.getCurrentMemtable();
-            newView = currentView.switchMemtable(newMemtable);
-        }
-        while (!view.compareAndSet(currentView, newView));
-
-        if (truncating)
-            notifyRenewed(newMemtable);
-
-        return toFlushMemtable;
-    }
-
-    public void markFlushing(Memtable memtable)
-    {
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.markFlushing(memtable);
-        }
-        while (!view.compareAndSet(currentView, newView));
-    }
-
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
-    {
-        // sstable may be null if we flushed batchlog and nothing needed to be retained
-
-        if (!cfstore.isValid())
-        {
-            View currentView, newView;
-            do
-            {
-                currentView = view.get();
-                newView = currentView.replaceFlushed(memtable, sstable);
-                if (sstable != null)
-                    newView = newView.replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
-            }
-            while (!view.compareAndSet(currentView, newView));
-            return;
-        }
-
-        // back up before creating a new View (which makes the new one eligible for compaction)
-        if (sstable != null)
-            maybeIncrementallyBackup(sstable);
-
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.replaceFlushed(memtable, sstable);
-        }
-        while (!view.compareAndSet(currentView, newView));
-
-        if (sstable != null)
-        {
-            addNewSSTablesSize(Arrays.asList(sstable));
-            notifyAdded(sstable);
-        }
-    }
-
-    public void maybeIncrementallyBackup(final SSTableReader sstable)
-    {
-        if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
-            return;
-
-        File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
-        sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
-    }
-
-    /**
-     * @return true if we are able to mark the given @param sstables as compacted, before anyone else
-     *
-     * Note that we could acquire references on the marked sstables and release them in
-     * unmarkCompacting, but since we will never call markObsolete on a sstable marked
-     * as compacting (unless there is a serious bug), we can skip this.
-     */
-    public boolean markCompacting(Iterable<SSTableReader> sstables)
-    {
-        return markCompacting(sstables, false, false);
-    }
-
-    public boolean markCompacting(Iterable<SSTableReader> sstables, boolean newTables, boolean offline)
-    {
-        assert sstables != null && !Iterables.isEmpty(sstables);
-        while (true)
-        {
-            final View currentView = view.get();
-            if (Iterables.any(sstables, Predicates.in(currentView.compacting)))
-                return false;
-
-            Predicate<SSTableReader> live = new Predicate<SSTableReader>()
-            {
-                public boolean apply(SSTableReader sstable)
-                {
-                    return currentView.sstablesMap.get(sstable) == sstable && !sstable.isMarkedCompacted();
-                }
-            };
-            if (newTables)
-                assert !Iterables.any(sstables, Predicates.in(currentView.sstables));
-            else if (!offline && !Iterables.all(sstables, live))
-                return false;
-
-            View newView = currentView.markCompacting(sstables);
-            if (view.compareAndSet(currentView, newView))
-                return true;
-        }
-    }
-
-    /**
-     * Removes files from compacting status: this is different from 'markObsolete'
-     * because it should be run regardless of whether a compaction succeeded.
-     */
-    public void unmarkCompacting(Iterable<SSTableReader> unmark)
-    {
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.unmarkCompacting(unmark);
-        }
-        while (!view.compareAndSet(currentView, newView));
-
-        if (!cfstore.isValid())
-        {
-            // when the CFS is invalidated, it will call unreferenceSSTables().  However, unreferenceSSTables only deals
-            // with sstables that aren't currently being compacted.  If there are ongoing compactions that finish or are
-            // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
-            unreferenceSSTables();
-        }
-    }
-
-    public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
-    {
-        removeSSTablesFromTracker(sstables);
-        releaseReferences(sstables, false);
-        notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
-    }
-
-    /**
-     *
-     * @param oldSSTables
-     * @param allReplacements
-     * @param compactionType
-     */
-    // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
-    // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
-    public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
-    {
-        removeSSTablesFromTracker(oldSSTables);
-        releaseReferences(oldSSTables, false);
-        notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
-        addNewSSTablesSize(allReplacements);
-    }
-
-    public void addInitialSSTables(Collection<SSTableReader> sstables)
-    {
-        addSSTablesToTracker(sstables);
-        // no notifications or backup necessary
-    }
-
-    public void addSSTables(Collection<SSTableReader> sstables)
-    {
-        addSSTablesToTracker(sstables);
-        for (SSTableReader sstable : sstables)
-        {
-            maybeIncrementallyBackup(sstable);
-            notifyAdded(sstable);
-        }
-    }
-
-    /**
-     * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
-     *
-     * @param toReplace
-     * @param replaceWith
-     */
-    public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
-    {
-        replaceReaders(toReplace, replaceWith, true);
-    }
-
-    /**
-     * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
-     *
-     * note that we dont track the live size of these sstables
-     * @param toReplace
-     * @param replaceWith
-     */
-    public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
-    {
-        for (SSTableReader s : toReplace)
-            assert s.openReason == SSTableReader.OpenReason.EARLY;
-        // note that we can replace an early opened file with a real one
-        replaceReaders(toReplace, replaceWith, false);
-    }
-
-    /**
-     * removes all sstables that are not busy compacting.
-     */
-    public void unreferenceSSTables()
-    {
-        Set<SSTableReader> notCompacting;
-
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            if (!currentView.compacting.isEmpty())
-                logger.error("Set of compacting sstables is non-empty when invalidating sstables {}", currentView.compacting);
-            notCompacting = currentView.nonCompactingSStables();
-            newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
-        }
-        while (!view.compareAndSet(currentView, newView));
-
-        if (notCompacting.isEmpty())
-        {
-            // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
-            return;
-        }
-        notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
-        removeOldSSTablesSize(notCompacting);
-        releaseReferences(notCompacting, true);
-    }
-
-    /**
-     * Removes every SSTable in the directory from the DataTracker's view.
-     * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
-     */
-    void removeUnreadableSSTables(File directory)
-    {
-        View currentView, newView;
-        Set<SSTableReader> remaining = new HashSet<>();
-        do
-        {
-            currentView = view.get();
-            for (SSTableReader r : currentView.nonCompactingSStables())
-                if (!r.descriptor.directory.equals(directory))
-                    remaining.add(r);
-
-            if (remaining.size() == currentView.nonCompactingSStables().size())
-                return;
-
-            newView = currentView.replace(currentView.sstables, remaining);
-        }
-        while (!view.compareAndSet(currentView, newView));
-        for (SSTableReader sstable : currentView.sstables)
-            if (!remaining.contains(sstable))
-                sstable.selfRef().release();
-        notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
-    }
-
-    /** (Re)initializes the tracker, purging all references. */
-    void init()
-    {
-        view.set(new View(
-                         ImmutableList.of(new Memtable(cfstore)),
-                         ImmutableList.<Memtable>of(),
-                         Collections.<SSTableReader, SSTableReader>emptyMap(),
-                         Collections.<SSTableReader>emptySet(),
-                         Collections.<SSTableReader>emptySet(),
-                         SSTableIntervalTree.empty()));
-    }
-
-    /**
-     * A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see
-     * SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519).  This does not mark the old reader
-     * as compacted.
-     * @param oldSSTables replaced readers
-     * @param newSSTables replacement readers
-     */
-    private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
-    {
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.replace(oldSSTables, newSSTables);
-        }
-        while (!view.compareAndSet(currentView, newView));
-
-        if (!oldSSTables.isEmpty() && notify)
-            notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
-
-        for (SSTableReader sstable : newSSTables)
-            sstable.setTrackedBy(this);
-
-        Refs.release(Refs.selfRefs(oldSSTables));
-    }
-
-    private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
-    {
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
-        }
-        while (!view.compareAndSet(currentView, newView));
-        removeOldSSTablesSize(oldSSTables);
-    }
-
-    private void addSSTablesToTracker(Collection<SSTableReader> sstables)
-    {
-        View currentView, newView;
-        do
-        {
-            currentView = view.get();
-            newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
-        }
-        while (!view.compareAndSet(currentView, newView));
-        addNewSSTablesSize(sstables);
-    }
-
-    private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
-    {
-        for (SSTableReader sstable : newSSTables)
-        {
-            if (logger.isDebugEnabled())
-                logger.debug(String.format("adding %s to list of files tracked for %s.%s",
-                            sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
-            long size = sstable.bytesOnDisk();
-            StorageMetrics.load.inc(size);
-            cfstore.metric.liveDiskSpaceUsed.inc(size);
-            cfstore.metric.totalDiskSpaceUsed.inc(size);
-            sstable.setTrackedBy(this);
-        }
-    }
-
-    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
-    {
-        for (SSTableReader sstable : oldSSTables)
-        {
-            if (logger.isDebugEnabled())
-                logger.debug(String.format("removing %s from list of files tracked for %s.%s",
-                            sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
-            long size = sstable.bytesOnDisk();
-            StorageMetrics.load.dec(size);
-            cfstore.metric.liveDiskSpaceUsed.dec(size);
-        }
-    }
-
-    private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
-    {
-        for (SSTableReader sstable : oldSSTables)
-        {
-            boolean firstToCompact = sstable.markObsolete();
-            assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
-            sstable.selfRef().release();
-        }
-    }
-
-    public void spaceReclaimed(long size)
-    {
-        cfstore.metric.totalDiskSpaceUsed.dec(size);
-    }
-
-    public long estimatedKeys()
-    {
-        long n = 0;
-        for (SSTableReader sstable : getSSTables())
-            n += sstable.estimatedKeys();
-        return n;
-    }
-
-    public int getMeanColumns()
-    {
-        long sum = 0;
-        long count = 0;
-        for (SSTableReader sstable : getSSTables())
-        {
-            long n = sstable.getEstimatedColumnCount().count();
-            sum += sstable.getEstimatedColumnCount().mean() * n;
-            count += n;
-        }
-        return count > 0 ? (int) (sum / count) : 0;
-    }
-
-    public double getDroppableTombstoneRatio()
-    {
-        double allDroppable = 0;
-        long allColumns = 0;
-        int localTime = (int)(System.currentTimeMillis()/1000);
-
-        for (SSTableReader sstable : getSSTables())
-        {
-            allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
-            allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
-        }
-        return allColumns > 0 ? allDroppable / allColumns : 0;
-    }
-
-    public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
-    {
-        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-    }
-
-    public void notifyAdded(SSTableReader added)
-    {
-        INotification notification = new SSTableAddedNotification(added);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-    }
-
-    public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
-    {
-        INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-
-    }
-
-    public void notifyDeleting(SSTableReader deleting)
-    {
-        INotification notification = new SSTableDeletingNotification(deleting);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-    }
-
-    public void notifyRenewed(Memtable renewed)
-    {
-        INotification notification = new MemtableRenewedNotification(renewed);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-    }
-
-    public void notifyTruncated(long truncatedAt)
-    {
-        INotification notification = new TruncationNotification(truncatedAt);
-        for (INotificationConsumer subscriber : subscribers)
-            subscriber.handleNotification(notification, this);
-    }
-
-    public void subscribe(INotificationConsumer consumer)
-    {
-        subscribers.add(consumer);
-    }
-
-    public void unsubscribe(INotificationConsumer consumer)
-    {
-        subscribers.remove(consumer);
-    }
-
-    public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables)
-    {
-        return new SSTableIntervalTree(buildIntervals(sstables));
-    }
-
-    public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables)
-    {
-        List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
-        for (SSTableReader sstable : sstables)
-            intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
-        return intervals;
-    }
-
-    public Set<SSTableReader> getCompacting()
-    {
-        return getView().compacting;
-    }
-
-    public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
-    {
-        private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
-
-        private SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
-        {
-            super(intervals);
-        }
-
-        public static SSTableIntervalTree empty()
-        {
-            return EMPTY;
-        }
-    }
-
-    /**
-     * An immutable structure holding the current memtable, the memtables pending
-     * flush, the sstables for a column family, and the sstables that are active
-     * in compaction (a subset of the sstables).
-     */
-    public static class View
-    {
-        /**
-         * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
-         * and the new replacement memtable, until all outstanding write operations on the old table complete.
-         * The last item in the list is always the "current" memtable.
-         */
-        private final List<Memtable> liveMemtables;
-        /**
-         * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
-         * flushed. In chronologically ascending order.
-         */
-        private final List<Memtable> flushingMemtables;
-        public final Set<SSTableReader> compacting;
-        public final Set<SSTableReader> sstables;
-        // we use a Map here so that we can easily perform identity checks as well as equality checks.
-        // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
-        // and we then check that not only are they all present in the live set, but that the exact instance present is
-        // the one we made our decision to compact against.
-        public final Map<SSTableReader, SSTableReader> sstablesMap;
-
-        // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable
-        public final Set<SSTableReader> shadowed;
-        public final SSTableIntervalTree intervalTree;
-
-        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
-        {
-            this.shadowed = shadowed;
-            assert liveMemtables != null;
-            assert flushingMemtables != null;
-            assert sstables != null;
-            assert compacting != null;
-            assert intervalTree != null;
-
-            this.liveMemtables = liveMemtables;
-            this.flushingMemtables = flushingMemtables;
-
-            this.sstablesMap = sstables;
-            this.sstables = sstablesMap.keySet();
-            this.compacting = compacting;
-            this.intervalTree = intervalTree;
-        }
-
-        public Memtable getOldestMemtable()
-        {
-            if (!flushingMemtables.isEmpty())
-                return flushingMemtables.get(0);
-            return liveMemtables.get(0);
-        }
-
-        public Memtable getCurrentMemtable()
-        {
-            return liveMemtables.get(liveMemtables.size() - 1);
-        }
-
-        public Iterable<Memtable> getMemtablesPendingFlush()
-        {
-            if (liveMemtables.size() == 1)
-                return flushingMemtables;
-            return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables);
-        }
-
-        /**
-         * @return the active memtable and all the memtables that are pending flush.
-         */
-        public Iterable<Memtable> getAllMemtables()
-        {
-            return Iterables.concat(flushingMemtables, liveMemtables);
-        }
-
-        public Sets.SetView<SSTableReader> nonCompactingSStables()
-        {
-            return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
-        }
-
-        View switchMemtable(Memtable newMemtable)
-        {
-            List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
-            return new View(newLiveMemtables, flushingMemtables, sstablesMap, compacting, shadowed, intervalTree);
-        }
-
-        View markFlushing(Memtable toFlushMemtable)
-        {
-            List<Memtable> live = liveMemtables, flushing = flushingMemtables;
-
-            // since we can have multiple flushes queued, we may occasionally race and start a flush out of order,
-            // so must locate it in the list to remove, rather than just removing from the beginning
-            int i = live.indexOf(toFlushMemtable);
-            assert i < live.size() - 1;
-            List<Memtable> newLive = ImmutableList.<Memtable>builder()
-                                                  .addAll(live.subList(0, i))
-                                                  .addAll(live.subList(i + 1, live.size()))
-                                                  .build();
-
-            // similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the
-            // flushing list in a position other than the end, though this will be rare
-            i = flushing.size();
-            while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime())
-                i--;
-            List<Memtable> newFlushing = ImmutableList.<Memtable>builder()
-                                                      .addAll(flushing.subList(0, i))
-                                                      .add(toFlushMemtable)
-                                                      .addAll(flushing.subList(i, flushing.size()))
-                                                      .build();
-
-            return new View(newLive, newFlushing, sstablesMap, compacting, shadowed, intervalTree);
-        }
-
-        View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
-        {
-            int index = flushingMemtables.indexOf(flushedMemtable);
-            List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder()
-                                                             .addAll(flushingMemtables.subList(0, index))
-                                                             .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
-                                                             .build();
-            Map<SSTableReader, SSTableReader> newSSTables = sstablesMap;
-            SSTableIntervalTree intervalTree = this.intervalTree;
-            if (newSSTable != null)
-            {
-                assert !sstables.contains(newSSTable);
-                assert !shadowed.contains(newSSTable);
-                newSSTables = ImmutableMap.<SSTableReader, SSTableReader>builder()
-                                          .putAll(sstablesMap).put(newSSTable, newSSTable).build();
-                intervalTree = buildIntervalTree(newSSTables.keySet());
-            }
-            return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree);
-        }
-
-        View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
-        {
-            ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
-            int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements);
-            assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
-            Map<SSTableReader, SSTableReader> newSSTables = new HashMap<>(newSSTablesSize);
-            Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
-
-            for (SSTableReader sstable : sstables)
-                if (!oldSet.contains(sstable))
-                    newSSTables.put(sstable, sstable);
-
-            for (SSTableReader sstable : shadowed)
-                if (!oldSet.contains(sstable))
-                    newShadowed.add(sstable);
-
-            for (SSTableReader replacement : replacements)
-            {
-                if (replacement.openReason == SSTableReader.OpenReason.SHADOWED)
-                    newShadowed.add(replacement);
-                else
-                    newSSTables.put(replacement, replacement);
-            }
-
-            assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
-                String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
-                          newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this);
-            newShadowed = ImmutableSet.copyOf(newShadowed);
-            newSSTables = ImmutableMap.copyOf(newSSTables);
-            SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables.keySet());
-            return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree);
-        }
-
-        View markCompacting(Iterable<SSTableReader> tomark)
-        {
-            Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
-        }
-
-        View unmarkCompacting(Iterable<SSTableReader> tounmark)
-        {
-            Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
-        }
-
-        @Override
-        public String toString()
-        {
-            return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
-        }
-
-        public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
-        {
-            if (intervalTree.isEmpty())
-                return Collections.emptyList();
-            RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
-            return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 589958e..df8820b 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -250,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     protected synchronized void compact()
     {
         ArrayList<Descriptor> descriptors = new ArrayList<>();
-        for (SSTable sstable : hintStore.getDataTracker().getUncompactingSSTables())
+        for (SSTable sstable : hintStore.getTracker().getUncompacting())
             descriptors.add(sstable.descriptor);
 
         if (descriptors.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f30bdaa..1d86784 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,7 +146,7 @@ public class Keyspace
     }
 
     /**
-     * Removes every SSTable in the directory from the appropriate DataTracker's view.
+     * Removes every SSTable in the directory from the appropriate Tracker's view.
      * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
      */
     public static void removeUnreadableSSTables(File directory)
@@ -276,6 +275,18 @@ public class Keyspace
         }
     }
 
+    private Keyspace(KSMetaData metadata)
+    {
+        this.metadata = metadata;
+        createReplicationStrategy(metadata);
+        this.metric = new KeyspaceMetrics(this);
+    }
+
+    public static Keyspace mockKS(KSMetaData metadata)
+    {
+        return new Keyspace(metadata);
+    }
+
     public void createReplicationStrategy(KSMetaData ksm)
     {
         replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index eab64ae..55b0bfe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -46,7 +49,7 @@ import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.*;
 
-public class Memtable
+public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
@@ -64,6 +67,11 @@ public class Memtable
     // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
     private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
 
+    public int compareTo(Memtable that)
+    {
+        return this.minReplayPosition.compareTo(that.minReplayPosition);
+    }
+
     public static final class LastReplayPosition extends ReplayPosition
     {
         public LastReplayPosition(ReplayPosition copy) {
@@ -92,6 +100,15 @@ public class Memtable
         this.cfs.scheduleFlush();
     }
 
+    // ONLY to be used for testing, to create a mock Memtable
+    @VisibleForTesting
+    public Memtable(CFMetaData metadata)
+    {
+        this.initialComparator = metadata.comparator;
+        this.cfs = null;
+        this.allocator = null;
+    }
+
     public MemtableAllocator getAllocator()
     {
         return allocator;
@@ -107,7 +124,8 @@ public class Memtable
         return currentOperations.get();
     }
 
-    void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
+    @VisibleForTesting
+    public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
     {
         assert this.writeBarrier == null;
         this.lastReplayPosition = lastReplayPosition;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 29826b8..38107c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -175,9 +176,9 @@ public abstract class AbstractCompactionStrategy
      */
     public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore);
 
-    public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
+    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final int gcBefore, long maxSSTableBytes)
     {
-        return new CompactionTask(cfs, sstables, gcBefore, false);
+        return new CompactionTask(cfs, txn, gcBefore, false);
     }
 
     /**
@@ -231,7 +232,7 @@ public abstract class AbstractCompactionStrategy
      */
     public void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
-        cfs.getDataTracker().replaceFlushed(memtable, sstable);
+        cfs.getTracker().replaceFlushed(memtable, sstable);
         if (sstable != null)
             CompactionManager.instance.submitBackground(cfs);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index ac646ef..3bf224e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -24,27 +24,28 @@ import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorSt
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 
 public abstract class AbstractCompactionTask extends WrappedRunnable
 {
     protected final ColumnFamilyStore cfs;
-    protected Set<SSTableReader> sstables;
+    protected LifecycleTransaction transaction;
     protected boolean isUserDefined;
     protected OperationType compactionType;
 
     /**
      * @param cfs
-     * @param sstables must be marked compacting
+     * @param transaction the modifying managing the status of the sstables we're replacing
      */
-    public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
+    public AbstractCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction)
     {
         this.cfs = cfs;
-        this.sstables = sstables;
+        this.transaction = transaction;
         this.isUserDefined = false;
         this.compactionType = OperationType.COMPACTION;
         // enforce contract that caller should mark sstables compacting
-        Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
-        for (SSTableReader sstable : sstables)
+        Set<SSTableReader> compacting = transaction.tracker.getCompacting();
+        for (SSTableReader sstable : transaction.originals())
             assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
     }
 
@@ -59,10 +60,10 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
         }
         finally
         {
-            cfs.getDataTracker().unmarkCompacting(sstables);
+            transaction.close();
         }
     }
-    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables);
+    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
 
     protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
 
@@ -80,6 +81,6 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
 
     public String toString()
     {
-        return "CompactionTask(" + sstables + ")";
+        return "CompactionTask(" + transaction + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a49a3ea..2292e01 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -24,6 +24,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -31,7 +33,7 @@ import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.OverlapIterator;
 import org.apache.cassandra.utils.concurrent.Refs;
 
-import static org.apache.cassandra.db.DataTracker.buildIntervals;
+import static org.apache.cassandra.db.lifecycle.SSTableIntervalTree.buildIntervals;
 
 /**
  * Manage compaction options.