You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:16 UTC

[11/20] cassandra git commit: Merge commit '849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0

Merge commit '849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/78a3d2bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/78a3d2bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/78a3d2bb

Branch: refs/heads/cassandra-3.7
Commit: 78a3d2bba95b9efcda152a157f822f4970f22636
Parents: e9657a4 849a438
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 12 15:17:51 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 12 15:19:15 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 282 ++++++++++++-------
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 243 ++++++++--------
 .../db/commitlog/CommitLogReplayer.java         |  56 ++--
 .../cassandra/db/commitlog/ReplayPosition.java  |  93 ++++--
 .../compaction/AbstractCompactionStrategy.java  |   3 -
 .../compaction/CompactionStrategyManager.java   |   3 -
 .../apache/cassandra/db/lifecycle/Tracker.java  |  53 +++-
 .../org/apache/cassandra/db/lifecycle/View.java |  39 ++-
 .../apache/cassandra/db/view/TableViews.java    |   4 +-
 .../io/sstable/format/SSTableReader.java        |   5 -
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../io/sstable/metadata/CompactionMetadata.java |  11 +-
 .../metadata/LegacyMetadataSerializer.java      |  13 +-
 .../io/sstable/metadata/MetadataCollector.java  |  37 ++-
 .../io/sstable/metadata/StatsMetadata.java      |  39 ++-
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../legacy_mb_clust/mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
 .../legacy_mb_clust/mb-1-big-Data.db            | Bin 0 -> 5342 bytes
 .../legacy_mb_clust/mb-1-big-Digest.crc32       |   1 +
 .../legacy_mb_clust/mb-1-big-Filter.db          | Bin 0 -> 24 bytes
 .../legacy_mb_clust/mb-1-big-Index.db           | Bin 0 -> 157553 bytes
 .../legacy_mb_clust/mb-1-big-Statistics.db      | Bin 0 -> 7058 bytes
 .../legacy_mb_clust/mb-1-big-Summary.db         | Bin 0 -> 47 bytes
 .../legacy_mb_clust/mb-1-big-TOC.txt            |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_mb_clust_compact/mb-1-big-Data.db    | Bin 0 -> 5383 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_clust_compact/mb-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mb_clust_compact/mb-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7058 bytes
 .../legacy_mb_clust_compact/mb-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mb_clust_compact/mb-1-big-TOC.txt    |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../legacy_mb_clust_counter/mb-1-big-Data.db    | Bin 0 -> 4625 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_clust_counter/mb-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mb_clust_counter/mb-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7067 bytes
 .../legacy_mb_clust_counter/mb-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mb_clust_counter/mb-1-big-TOC.txt    |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../mb-1-big-Data.db                            | Bin 0 -> 4639 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../mb-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mb-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7067 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mb-1-big-TOC.txt                            |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple/mb-1-big-Data.db           | Bin 0 -> 91 bytes
 .../legacy_mb_simple/mb-1-big-Digest.crc32      |   1 +
 .../legacy_mb_simple/mb-1-big-Filter.db         | Bin 0 -> 24 bytes
 .../legacy_mb_simple/mb-1-big-Index.db          | Bin 0 -> 26 bytes
 .../legacy_mb_simple/mb-1-big-Statistics.db     | Bin 0 -> 4611 bytes
 .../legacy_mb_simple/mb-1-big-Summary.db        | Bin 0 -> 47 bytes
 .../legacy_mb_simple/mb-1-big-TOC.txt           |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple_compact/mb-1-big-Data.db   | Bin 0 -> 91 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_simple_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mb_simple_compact/mb-1-big-Index.db  | Bin 0 -> 26 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4652 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mb_simple_compact/mb-1-big-TOC.txt   |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple_counter/mb-1-big-Data.db   | Bin 0 -> 115 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_simple_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mb_simple_counter/mb-1-big-Index.db  | Bin 0 -> 27 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4620 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mb_simple_counter/mb-1-big-TOC.txt   |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../mb-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../mb-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mb-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4661 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mb-1-big-TOC.txt                            |   8 +
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  23 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../CompressedRandomAccessReaderTest.java       |   4 +-
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../metadata/MetadataSerializerTest.java        |  83 +++++-
 .../cassandra/io/util/MmappedRegionsTest.java   |   3 +-
 92 files changed, 715 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 306f46f,dfcad10..677ea11
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,7 +1,31 @@@
 - * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 +3.0.7
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
 + * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
  
 -2.2.7
 - * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 +3.0.6
 + * Disallow creating view with a static column (CASSANDRA-11602)
 + * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593)
 + * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618)
 + * Fix queries with filtering on counter columns (CASSANDRA-11629)
 + * Improve tombstone printing in sstabledump (CASSANDRA-11655)
 + * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669)
 + * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600)
 + * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654)
 + * Ignore all LocalStrategy keyspaces for streaming and other related
 +   operations (CASSANDRA-11627)
 + * Ensure columnfilter covers indexed columns for thrift 2i queries (CASSANDRA-11523)
 + * Only open one sstable scanner per sstable (CASSANDRA-11412)
 + * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410)
 + * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485)
 + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470)
 + * Notify indexers of expired rows during compaction (CASSANDRA-11329)
 + * Properly respond with ProtocolError when a v1/v2 native protocol
 +   header is received (CASSANDRA-11464)
 + * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120)
 +Merged from 2.2:
++ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
   * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
   * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
   * Produce a heap dump when exiting on OOM (CASSANDRA-9861)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5b8de8f,88e22c0..e9a2938
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -892,10 -996,13 +903,13 @@@ public class ColumnFamilyStore implemen
          final boolean flushSecondaryIndexes;
          final OpOrder.Barrier writeBarrier;
          final CountDownLatch latch = new CountDownLatch(1);
-         final ReplayPosition lastReplayPosition;
 +        volatile FSWriteError flushFailure = null;
+         final ReplayPosition commitLogUpperBound;
+         final List<Memtable> memtables;
 -        final List<SSTableReader> readers;
 -        volatile FSWriteError flushFailure = null;
++        final List<Collection<SSTableReader>> readers;
  
-         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
 -                          List<Memtable> memtables, List<SSTableReader> readers)
++                          List<Memtable> memtables, List<Collection<SSTableReader>> readers)
          {
              this.writeBarrier = writeBarrier;
              this.flushSecondaryIndexes = flushSecondaryIndexes;
@@@ -927,18 -1046,23 +946,25 @@@
                  throw new IllegalStateException();
              }
  
-             // must check lastReplayPosition != null because Flush may find that all memtables are clean
-             // and so not set a lastReplayPosition
              // If a flush errored out but the error was ignored, make sure we don't discard the commit log.
-             if (lastReplayPosition != null && flushFailure == null)
+             if (flushFailure == null)
              {
-                 CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+                 CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
+                 for (int i = 0 ; i < memtables.size() ; i++)
+                 {
+                     Memtable memtable = memtables.get(i);
 -                    SSTableReader reader = readers.get(i);
++                    Collection<SSTableReader> reader = readers.get(i);
+                     memtable.cfs.data.permitCompactionOfFlushed(reader);
 -                    memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
++                    memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
+                 }
              }
 +
              metric.pendingFlushes.dec();
  
              if (flushFailure != null)
                  throw flushFailure;
++
+             return commitLogUpperBound;
          }
      }
  
@@@ -953,7 -1077,8 +979,8 @@@
      private final class Flush implements Runnable
      {
          final OpOrder.Barrier writeBarrier;
-         final List<Memtable> memtables;
+         final List<Memtable> memtables = new ArrayList<>();
 -        final List<SSTableReader> readers = new ArrayList<>();
++        final List<Collection<SSTableReader>> readers = new ArrayList<>();
          final PostFlush postFlush;
          final boolean truncate;
  
@@@ -1024,27 -1139,30 +1041,23 @@@
                  memtable.cfs.data.markFlushing(memtable);
                  if (memtable.isClean() || truncate)
                  {
-                     memtable.cfs.replaceFlushed(memtable, null);
 -                    memtable.cfs.data.replaceFlushed(memtable, null);
 -                    memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, null);
++                    memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
++                    memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
                      reclaim(memtable);
                      iter.remove();
                  }
              }
  
--            if (memtables.isEmpty())
--            {
--                postFlush.latch.countDown();
--                return;
--            }
--
              metric.memtableSwitchCount.inc();
  
              try
              {
                  for (Memtable memtable : memtables)
                  {
--                    // flush the memtable
-                     MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
 -                    SSTableReader reader = memtable.flush();
 -                    memtable.cfs.data.replaceFlushed(memtable, reader);
++                    Collection<SSTableReader> readers = memtable.flush();
++                    memtable.cfs.data.replaceFlushed(memtable, readers);
                      reclaim(memtable);
 -                    readers.add(reader);
++                    this.readers.add(readers);
                  }
              }
              catch (FSWriteError e)
@@@ -1406,22 -1655,172 +1451,32 @@@
          return data;
      }
  
+     public Collection<SSTableReader> getSSTables()
+     {
+         return data.getSSTables();
+     }
+ 
+     public Iterable<SSTableReader> getPermittedToCompactSSTables()
+     {
+         return data.getPermittedToCompact();
+     }
+ 
 -    public Set<SSTableReader> getUncompactingSSTables()
 +    public Set<SSTableReader> getLiveSSTables()
      {
 -        return data.getUncompacting();
 +        return data.getView().liveSSTables();
      }
  
 -    public ColumnFamily getColumnFamily(DecoratedKey key,
 -                                        Composite start,
 -                                        Composite finish,
 -                                        boolean reversed,
 -                                        int limit,
 -                                        long timestamp)
 +    public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet)
      {
 -        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
 +        return data.getView().sstables(sstableSet);
      }
  
 -    /**
 -     * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it
 -     *
 -     * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk
 -     *
 -     * If row is not cached, we figure out what filter is "biggest", read that from disk, then
 -     * filter the result and either cache that or return it.
 -     *
 -     * @param cfId the column family to read the row from
 -     * @param filter the columns being queried.
 -     * @return the requested data for the filter provided
 -     */
 -    private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
 +    public Iterable<SSTableReader> getUncompactingSSTables()
      {
 -        assert isRowCacheEnabled()
 -               : String.format("Row cache is not enabled on table [" + name + "]");
 -
 -        RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
 -
 -        // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
 -        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
 -        // TODO: don't evict entire rows on writes (#2864)
 -        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
 -        if (cached != null)
 -        {
 -            if (cached instanceof RowCacheSentinel)
 -            {
 -                // Some other read is trying to cache the value, just do a normal non-caching read
 -                Tracing.trace("Row cache miss (race)");
 -                metric.rowCacheMiss.inc();
 -                return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -            }
 -
 -            ColumnFamily cachedCf = (ColumnFamily)cached;
 -            if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp))
 -            {
 -                metric.rowCacheHit.inc();
 -                Tracing.trace("Row cache hit");
 -                ColumnFamily result = filterColumnFamily(cachedCf, filter);
 -                metric.updateSSTableIterated(0);
 -                return result;
 -            }
 -
 -            metric.rowCacheHitOutOfRange.inc();
 -            Tracing.trace("Ignoring row cache as cached value could not satisfy query");
 -            return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -        }
 -
 -        metric.rowCacheMiss.inc();
 -        Tracing.trace("Row cache miss");
 -        RowCacheSentinel sentinel = new RowCacheSentinel();
 -        boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
 -        ColumnFamily data = null;
 -        ColumnFamily toCache = null;
 -        try
 -        {
 -            // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing
 -            if (metadata.getCaching().rowCache.cacheFullPartitions())
 -            {
 -                data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE);
 -                toCache = data;
 -                Tracing.trace("Populating row cache with the whole partition");
 -                if (sentinelSuccess && toCache != null)
 -                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
 -                return filterColumnFamily(data, filter);
 -            }
 -
 -            // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query
 -            // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said
 -            // filter so we can populate the cache but only if:
 -            //   1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user.
 -            //   2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the
 -            //      amount of extra work we'll do on a user query for the purpose of populating the cache).
 -            //
 -            // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the
 -            // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be
 -            // bogus to compare the filter count to the 'rows to cache' otherwise).
 -            if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator))
 -            {
 -                SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter;
 -                int rowsToCache = metadata.getCaching().rowCache.rowsToCache;
 -
 -                SliceQueryFilter cacheSlice = readFilterForCache();
 -                QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp);
 -
 -                // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the
 -                // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what
 -                // needs to be cached afterwards.
 -                if (sliceFilter.count < rowsToCache)
 -                {
 -                    toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
 -                    if (toCache != null)
 -                    {
 -                        Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted());
 -                        data = filterColumnFamily(toCache, filter);
 -                    }
 -                }
 -                else
 -                {
 -                    data = getTopLevelColumns(filter, Integer.MIN_VALUE);
 -                    if (data != null)
 -                    {
 -                        // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty
 -                        // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it
 -                        // (otherwise a cache hit would assume the whole partition is cached which is not the case).
 -                        if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache)
 -                        {
 -                            toCache = filterColumnFamily(data, cacheFilter);
 -                            Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count);
 -                        }
 -                        else
 -                        {
 -                            Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache);
 -                        }
 -                    }
 -                }
 -
 -                if (sentinelSuccess && toCache != null)
 -                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
 -                return data;
 -            }
 -            else
 -            {
 -                Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
 -                return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -            }
 -        }
 -        finally
 -        {
 -            if (sentinelSuccess && toCache == null)
 -                invalidateCachedRow(key);
 -        }
 -    }
 -
 -    public SliceQueryFilter readFilterForCache()
 -    {
 -        // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable.
 -        return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size());
 +        return data.getUncompacting();
      }
  
 -    public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now)
 +    public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec)
      {
          // We can use the cached value only if we know that no data it doesn't contain could be covered
          // by the query filter, that is if:
@@@ -1910,40 -2759,45 +1965,52 @@@
          // position in the System keyspace.
          logger.trace("truncating {}", name);
  
-         if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
-         {
-             // flush the CF being truncated before forcing the new segment
-             forceBlockingFlush();
+         final long truncatedAt;
+         final ReplayPosition replayAfter;
  
-             viewManager.forceBlockingFlush();
- 
-             // sleep a little to make sure that our truncatedAt comes after any sstable
-             // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
-             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
-         }
-         else
+         synchronized (data)
          {
-             dumpMemtable();
-             viewManager.dumpMemtables();
 -            if (keyspace.getMetadata().durableWrites || takeSnapshot)
++            if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
+             {
+                 replayAfter = forceBlockingFlush();
++                viewManager.forceBlockingFlush();
+             }
+             else
+             {
+                 // just nuke the memtable data w/o writing to disk first
 -                final Flush flush = new Flush(true);
 -                flushExecutor.execute(flush);
 -                replayAfter = FBUtilities.waitOnFuture(postFlushExecutor.submit(flush.postFlush));
++                viewManager.dumpMemtables();
++                try
++                {
++                    replayAfter = dumpMemtable().get();
++                }
++                catch (Exception e)
++                {
++                    throw new RuntimeException(e);
++                }
+             }
+ 
+             long now = System.currentTimeMillis();
+             // make sure none of our sstables are somehow in the future (clock drift, perhaps)
+             for (ColumnFamilyStore cfs : concatWithIndexes())
+                 for (SSTableReader sstable : cfs.data.getSSTables())
+                     now = Math.max(now, sstable.maxDataAge);
+             truncatedAt = now;
          }
  
          Runnable truncateRunnable = new Runnable()
          {
              public void run()
              {
-                 logger.trace("Discarding sstable data for truncated CF + indexes");
- 
-                 final long truncatedAt = System.currentTimeMillis();
+                 logger.debug("Discarding sstable data for truncated CF + indexes");
                  data.notifyTruncated(truncatedAt);
  
 -                if (takeSnapshot)
 +                if (DatabaseDescriptor.isAutoSnapshot())
                      snapshot(Keyspace.getTimestampedSnapshotName(name));
  
-                 ReplayPosition replayAfter = discardSSTables(truncatedAt);
+                 discardSSTables(truncatedAt);
  
 -                for (SecondaryIndex index : indexManager.getIndexes())
 -                    index.truncateBlocking(truncatedAt);
 +                indexManager.truncateAllIndexesBlocking(truncatedAt);
- 
-                 viewManager.truncateBlocking(truncatedAt);
++                viewManager.truncateBlocking(replayAfter, truncatedAt);
  
                  SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
                  logger.trace("cleaning out row cache");
@@@ -1955,20 -2809,7 +2022,20 @@@
          logger.trace("truncate complete");
      }
  
 -    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
 +    /**
 +     * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
 +     */
-     public void dumpMemtable()
++    public Future<ReplayPosition> dumpMemtable()
 +    {
 +        synchronized (data)
 +        {
 +            final Flush flush = new Flush(true);
 +            flushExecutor.execute(flush);
-             postFlushExecutor.submit(flush.postFlush);
++            return postFlushExecutor.submit(flush.postFlush);
 +        }
 +    }
 +
 +    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
      {
          // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
          // and so we only run one major compaction at a time

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 5d5f7bf,b4ada09..93dc5af
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -87,12 -81,11 +90,11 @@@ public class Memtable implements Compar
          }
      }
  
 -    // We index the memtable by RowPosition only for the purpose of being able
 +    // We index the memtable by PartitionPosition only for the purpose of being able
      // to select key range using Token.KeyBound. However put() ensures that we
      // actually only store DecoratedKey.
 -    private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows = new ConcurrentSkipListMap<>();
 +    private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
      public final ColumnFamilyStore cfs;
-     private final long creationTime = System.currentTimeMillis();
      private final long creationNano = System.nanoTime();
  
      // The smallest timestamp for all partitions stored in this memtable
@@@ -101,14 -94,12 +103,16 @@@
      // Record the comparator of the CFS at the creation of the memtable. This
      // is only used when a user update the CF comparator, to know if the
      // memtable was created with the new or old comparator.
 -    public final CellNameType initialComparator;
 +    public final ClusteringComparator initialComparator;
  
 +    private final ColumnsCollector columnsCollector;
 +    private final StatsCollector statsCollector = new StatsCollector();
 +
-     public Memtable(ColumnFamilyStore cfs)
++    // only to be used by init(), to setup the very first memtable for the cfs
+     public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, ColumnFamilyStore cfs)
      {
          this.cfs = cfs;
+         this.commitLogLowerBound = commitLogLowerBound;
          this.allocator = MEMORY_POOL.newAllocator();
          this.initialComparator = cfs.metadata.comparator;
          this.cfs.scheduleFlush();
@@@ -191,12 -185,12 +200,12 @@@
  
      public boolean isClean()
      {
 -        return rows.isEmpty();
 +        return partitions.isEmpty();
      }
  
-     public boolean isCleanAfter(ReplayPosition position)
+     public boolean mayContainDataBefore(ReplayPosition position)
      {
-         return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
+         return approximateCommitLogLowerBound.compareTo(position) < 0;
      }
  
      /**
@@@ -251,14 -256,9 +260,9 @@@
  
      public int partitionCount()
      {
 -        return rows.size();
 +        return partitions.size();
      }
  
-     public FlushRunnable flushRunnable()
-     {
-         return new FlushRunnable(lastReplayPosition.get());
-     }
- 
      public String toString()
      {
          return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@@ -266,54 -266,65 +270,60 @@@
                               100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
      }
  
 -    /**
 -     * @param startWith Include data in the result from and including this key and to the end of the memtable
 -     * @return An iterator of entries with the data from the start key
 -     */
 -    public Iterator<Map.Entry<DecoratedKey, ColumnFamily>> getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
 +    public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
      {
 -        return new Iterator<Map.Entry<DecoratedKey, ColumnFamily>>()
 -        {
 -            private Iterator<? extends Map.Entry<? extends RowPosition, AtomicBTreeColumns>> iter = stopAt.isMinimum()
 -                    ? rows.tailMap(startWith).entrySet().iterator()
 -                    : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
 +        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
  
 -            private Map.Entry<? extends RowPosition, ? extends ColumnFamily> currentEntry;
 +        boolean startIsMin = keyRange.left.isMinimum();
 +        boolean stopIsMin = keyRange.right.isMinimum();
  
 -            public boolean hasNext()
 -            {
 -                return iter.hasNext();
 -            }
 +        boolean isBound = keyRange instanceof Bounds;
 +        boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds;
 +        boolean includeStop = isBound || keyRange instanceof Range;
 +        Map<PartitionPosition, AtomicBTreePartition> subMap;
 +        if (startIsMin)
 +            subMap = stopIsMin ? partitions : partitions.headMap(keyRange.right, includeStop);
 +        else
 +            subMap = stopIsMin
 +                   ? partitions.tailMap(keyRange.left, includeStart)
 +                   : partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
  
 -            public Map.Entry<DecoratedKey, ColumnFamily> next()
 -            {
 -                Map.Entry<? extends RowPosition, ? extends ColumnFamily> entryRowPosition = iter.next();
 -                // Actual stored key should be true DecoratedKey
 -                assert entryRowPosition.getKey() instanceof DecoratedKey;
 -                @SuppressWarnings("unchecked") // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
 -                Map.Entry<DecoratedKey, ColumnFamily> entry = (Map.Entry<DecoratedKey, ColumnFamily>) entryRowPosition;
 -                if (MEMORY_POOL.needToCopyOnHeap())
 -                {
 -                    DecoratedKey key = entry.getKey();
 -                    key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
 -                    ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
 -                    entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
 -                }
 -                // Store the reference to the current entry so that remove() can update the current size.
 -                currentEntry = entry;
 -                return entry;
 -            }
 +        int minLocalDeletionTime = Integer.MAX_VALUE;
  
 -            public void remove()
 -            {
 -                iter.remove();
 -                liveDataSize.addAndGet(-currentEntry.getValue().dataSize());
 -                currentEntry = null;
 -            }
 -        };
 +        // avoid iterating over the memtable if we purge all tombstones
 +        if (cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +            minLocalDeletionTime = findMinLocalDeletionTime(subMap.entrySet().iterator());
 +
 +        final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
 +
 +        return new MemtableUnfilteredPartitionIterator(cfs, iter, isForThrift, minLocalDeletionTime, columnFilter, dataRange);
 +    }
 +
 +    private int findMinLocalDeletionTime(Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iterator)
 +    {
 +        int minLocalDeletionTime = Integer.MAX_VALUE;
 +        while (iterator.hasNext())
 +        {
 +            Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iterator.next();
 +            minLocalDeletionTime = Math.min(minLocalDeletionTime, entry.getValue().stats().minLocalDeletionTime);
 +        }
 +        return minLocalDeletionTime;
      }
  
 -    public ColumnFamily getColumnFamily(DecoratedKey key)
 +    public Partition getPartition(DecoratedKey key)
      {
 -        return rows.get(key);
 +        return partitions.get(key);
      }
  
-     public long creationTime()
 -    public SSTableReader flush()
++    public Collection<SSTableReader> flush()
      {
-         return creationTime;
+         long estimatedSize = estimatedSize();
 -        Directories.DataDirectory dataDirectory = cfs.directories.getWriteableLocation(estimatedSize);
 -        File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
++        Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
++        if (dataDirectory == null)
++            throw new RuntimeException("Insufficient disk space to write " + estimatedSize + " bytes");
++        File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
+         assert sstableDirectory != null : "Flush task is not bound to any disk";
+         return writeSortedContents(sstableDirectory);
      }
  
      public long getMinTimestamp()
@@@ -321,137 -332,86 +331,110 @@@
          return minTimestamp;
      }
  
-     class FlushRunnable extends DiskAwareRunnable
+     private long estimatedSize()
      {
-         private final ReplayPosition context;
-         private final long estimatedSize;
- 
-         private final boolean isBatchLogTable;
- 
-         FlushRunnable(ReplayPosition context)
+         long keySize = 0;
 -        for (RowPosition key : rows.keySet())
++        for (PartitionPosition key : partitions.keySet())
          {
-             this.context = context;
- 
-             long keySize = 0;
-             for (PartitionPosition key : partitions.keySet())
-             {
-                 //  make sure we don't write non-sensical keys
-                 assert key instanceof DecoratedKey;
-                 keySize += ((DecoratedKey)key).getKey().remaining();
-             }
-             estimatedSize = (long) ((keySize // index entries
-                                     + keySize // keys in data file
-                                     + liveDataSize.get()) // data
-                                     * 1.2); // bloom filter and row index overhead
- 
-             this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
-         }
- 
-         public long getExpectedWriteSize()
-         {
-             return estimatedSize;
+             //  make sure we don't write non-sensical keys
+             assert key instanceof DecoratedKey;
+             keySize += ((DecoratedKey)key).getKey().remaining();
          }
+         return (long) ((keySize // index entries
+                         + keySize // keys in data file
+                         + liveDataSize.get()) // data
+                        * 1.2); // bloom filter and row index overhead
+     }
  
-         protected void runMayThrow() throws Exception
-         {
-             long writeSize = getExpectedWriteSize();
-             Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-             File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
-             assert sstableDirectory != null : "Flush task is not bound to any disk";
-             Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
-             cfs.replaceFlushed(Memtable.this, sstables);
-         }
 -    private SSTableReader writeSortedContents(File sstableDirectory)
++    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+     {
 -        logger.info("Writing {}", Memtable.this.toString());
++        boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
  
-         protected Directories getDirectories()
-         {
-             return cfs.getDirectories();
-         }
 -        SSTableReader ssTable;
 -        // errors when creating the writer that may leave empty temp files.
 -        try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
++        logger.debug("Writing {}", Memtable.this.toString());
 +
-         private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
++        Collection<SSTableReader> ssTables;
++        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
          {
-             logger.debug("Writing {}", Memtable.this.toString());
- 
-             Collection<SSTableReader> ssTables;
-             try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+             boolean trackContention = logger.isTraceEnabled();
+             int heavilyContendedRowCount = 0;
+             // (we can't clear out the map as-we-go to free up memory,
+             //  since the memtable is being used for queries in the "pending flush" category)
 -            for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
++            for (AtomicBTreePartition partition : partitions.values())
              {
-                 boolean trackContention = logger.isTraceEnabled();
-                 int heavilyContendedRowCount = 0;
-                 // (we can't clear out the map as-we-go to free up memory,
-                 //  since the memtable is being used for queries in the "pending flush" category)
-                 for (AtomicBTreePartition partition : partitions.values())
 -                AtomicBTreeColumns cf = entry.getValue();
++                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
++                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
++                // we don't need to preserve tombstones for repair. So if both operation are in this
++                // memtable (which will almost always be the case if there is no ongoing failure), we can
++                // just skip the entry (CASSANDRA-4667).
++                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
++                    continue;
++
++                if (trackContention && partition.usePessimisticLocking())
++                    heavilyContendedRowCount++;
+ 
 -                if (cf.isMarkedForDelete() && cf.hasColumns())
++                if (!partition.isEmpty())
                  {
-                     // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                     // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                     // we don't need to preserve tombstones for repair. So if both operation are in this
-                     // memtable (which will almost always be the case if there is no ongoing failure), we can
-                     // just skip the entry (CASSANDRA-4667).
-                     if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
 -                    // When every node is up, there's no reason to write batchlog data out to sstables
 -                    // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
 -                    // and BL data is strictly local, so we don't need to preserve tombstones for repair.
 -                    // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
 -                    // See CASSANDRA-4667.
 -                    if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
--                        continue;
- 
-                     if (trackContention && partition.usePessimisticLocking())
-                         heavilyContendedRowCount++;
- 
-                     if (!partition.isEmpty())
++                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
 +                    {
-                         try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                         {
-                             writer.append(iter);
-                         }
++                        writer.append(iter);
 +                    }
                  }
 -
 -                if (trackContention && cf.usePessimisticLocking())
 -                    heavilyContendedRowCount++;
 -
 -                if (!cf.isEmpty())
 -                    writer.append((DecoratedKey)entry.getKey(), cf);
+             }
  
-                 if (writer.getFilePointer() > 0)
-                 {
-                     logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                                writer.getFilename(),
-                                                FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                                context));
+             if (writer.getFilePointer() > 0)
+             {
+                 logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                            writer.getFilename(),
 -                                           FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
++                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                            commitLogUpperBound));
  
-                     // sstables should contain non-repaired data.
-                     ssTables = writer.finish(true);
-                 }
-                 else
-                 {
-                     logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                                 writer.getFilename(), context);
-                     writer.abort();
-                     ssTables = null;
-                 }
 -                // temp sstables should contain non-repaired data.
 -                ssTable = writer.finish(true);
++                // sstables should contain non-repaired data.
++                ssTables = writer.finish(true);
+             }
+             else
+             {
+                 logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                              writer.getFilename(), commitLogUpperBound);
+                 writer.abort();
 -                ssTable = null;
++                ssTables = Collections.emptyList();
+             }
  
-                 if (heavilyContendedRowCount > 0)
-                     logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+             if (heavilyContendedRowCount > 0)
 -                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
++                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
  
-                 return ssTables;
-             }
 -            return ssTable;
++            return ssTables;
          }
+     }
  
-         @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
-         public SSTableTxnWriter createFlushWriter(String filename,
-                                                PartitionColumns columns,
-                                                EncodingStats stats)
 -    private SSTableWriter createFlushWriter(String filename)
++    @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
++    public SSTableTxnWriter createFlushWriter(String filename,
++                                              PartitionColumns columns,
++                                              EncodingStats stats)
+     {
 -        MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
 -                                                     .commitLogLowerBound(commitLogLowerBound.get())
 -                                                     .commitLogUpperBound(commitLogUpperBound.get());
 -        return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
++        // we operate "offline" here, as we expose the resulting reader consciously when done
++        // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
++        LifecycleTransaction txn = null;
++        try
 +        {
-             // we operate "offline" here, as we expose the resulting reader consciously when done
-             // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
-             LifecycleTransaction txn = null;
-             try
-             {
-                 txn = LifecycleTransaction.offline(OperationType.FLUSH);
-                 MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-                 return new SSTableTxnWriter(txn,
-                                             cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
-                                                                          (long) partitions.size(),
-                                                                          ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                          sstableMetadataCollector,
-                                                                          new SerializationHeader(true, cfs.metadata, columns, stats),
-                                                                          txn));
-             }
-             catch (Throwable t)
-             {
-                 if (txn != null)
-                     txn.close();
-                 throw t;
-             }
++            txn = LifecycleTransaction.offline(OperationType.FLUSH);
++            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
++                                                         .commitLogLowerBound(commitLogLowerBound.get())
++                                                         .commitLogUpperBound(commitLogUpperBound.get());
++
++            return new SSTableTxnWriter(txn,
++                                        cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
++                                                                     (long) partitions.size(),
++                                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                     sstableMetadataCollector,
++                                                                     new SerializationHeader(true, cfs.metadata, columns, stats),
++                                                                     txn));
++        }
++        catch (Throwable t)
++        {
++            if (txn != null)
++                txn.close();
++            throw t;
 +        }
      }
  
      private static int estimateRowOverhead(final int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 7169b2f,a58aeb4..f45a47a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -35,9 -33,6 +35,8 @@@ import com.google.common.base.Throwable
  import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
- import com.google.common.collect.Ordering;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -46,18 -43,16 +45,17 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.io.util.FileSegmentInputStream;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.schema.CompressionParams;
  import org.apache.cassandra.io.compress.ICompressor;
 -import org.apache.cassandra.io.util.ByteBufferDataInput;
 -import org.apache.cassandra.io.util.FastByteArrayInputStream;
 +import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.FileDataInput;
 -import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.RandomAccessReader;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.CRC32Factory;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.WrappedRunnable;
@@@ -76,9 -69,9 +74,9 @@@ public class CommitLogReplaye
      private final List<Future<?>> futures;
      private final Map<UUID, AtomicInteger> invalidMutations;
      private final AtomicInteger replayedCount;
-     private final Map<UUID, ReplayPosition> cfPositions;
+     private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
      private final ReplayPosition globalPosition;
 -    private final ICRC32 checksum;
 +    private final CRC32 checksum;
      private byte[] buffer;
      private byte[] uncompressedBuffer;
  
@@@ -94,8 -87,8 +92,8 @@@
          this.invalidMutations = new HashMap<UUID, AtomicInteger>();
          // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
          this.replayedCount = new AtomicInteger();
 -        this.checksum = CRC32Factory.instance.create();
 +        this.checksum = new CRC32();
-         this.cfPositions = cfPositions;
+         this.cfPersisted = cfPersisted;
          this.globalPosition = globalPosition;
          this.replayFilter = replayFilter;
          this.archiver = commitLog.archiver;
@@@ -104,17 -97,12 +102,12 @@@
      public static CommitLogReplayer construct(CommitLog commitLog)
      {
          // compute per-CF and global replay positions
-         Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
-         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
+         Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
          ReplayFilter replayFilter = ReplayFilter.create();
+         ReplayPosition globalPosition = null;
          for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
          {
-             // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
-             // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
-             // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
-             ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables(SSTableSet.CANONICAL));
- 
 -            // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
 +            // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
              ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
              if (truncatedAt != null)
              {
@@@ -584,16 -580,12 +592,14 @@@
                  // or c) are part of a cf that was dropped.
                  // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
                  Mutation newMutation = null;
 -                for (ColumnFamily columnFamily : replayFilter.filter(mutation))
 +                for (PartitionUpdate update : replayFilter.filter(mutation))
                  {
 -                    if (Schema.instance.getCF(columnFamily.id()) == null)
 +                    if (Schema.instance.getCF(update.metadata().cfId) == null)
                          continue; // dropped
  
-                     ReplayPosition rp = cfPositions.get(update.metadata().cfId);
- 
 -                    if (shouldReplay(columnFamily.id(), new ReplayPosition(desc.id, entryLocation)))
 +                    // replay if current segment is newer than last flushed one or,
 +                    // if it is the last known segment, if we are after the replay position
-                     if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
++                    if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation)))
                      {
                          if (newMutation == null)
                              newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 28416f3,17802ad..0b21763
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@@ -17,11 -17,12 +17,11 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import java.io.DataInput;
  import java.io.IOException;
- import java.util.Comparator;
+ import java.util.Map;
+ import java.util.NavigableMap;
+ import java.util.TreeMap;
  
- import com.google.common.base.Function;
- import com.google.common.collect.Iterables;
  import com.google.common.collect.Ordering;
  
  import org.apache.cassandra.db.TypeSizes;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index c205d5c,575aa51..8928db5
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -237,13 -228,10 +237,10 @@@ public abstract class AbstractCompactio
       * Handle a flushed memtable.
       *
       * @param memtable the flushed memtable
 -     * @param sstable the written sstable. can be null if the memtable was clean.
 +     * @param sstables the written sstables. can be null or empty if the memtable was clean.
       */
 -    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
      {
-         cfs.getTracker().replaceFlushed(memtable, sstables);
-         if (sstables != null && !sstables.isEmpty())
-             CompactionManager.instance.submitBackground(cfs);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 82fd872,0000000..f1127c9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,504 -1,0 +1,501 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.service.ActiveRepairService;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for repaired data and one for
 + * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
 +    private volatile CompactionParams params;
 +    /*
 +        We keep a copy of the schema compaction parameters here to be able to decide if we
 +        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
 +
 +        If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
 +        we will use the new compaction parameters.
 +     */
 +    private CompactionParams schemaCompactionParams;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
 +        logger.trace("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        params = cfs.metadata.params.compaction;
 +        enabled = params.isEnabled();
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
 +        // compare the old schema configuration to the new one, ignore any locally set changes.
 +        if (metadata.params.compaction.equals(schemaCompactionParams))
 +            return;
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
 +        boolean disabledWithJMX = !enabled && shouldBeEnabled();
 +        setStrategy(metadata.params.compaction);
 +        schemaCompactionParams = metadata.params.compaction;
 +
 +        if (disabledWithJMX || !shouldBeEnabled())
 +            disable();
 +        else
 +            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
 +    {
-         cfs.getTracker().replaceFlushed(memtable, sstables);
-         if (sstables != null && !sstables.isEmpty())
-             CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +    public Directories getDirectories()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.getDirectories();
 +    }
 +
 +    public synchronized void handleNotification(INotification notification, Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 +            for (SSTableReader sstable : flushedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repaired.addSSTable(sstable);
 +                else
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
 +    public void enable()
 +    {
 +        if (repaired != null)
 +            repaired.enable();
 +        if (unrepaired != null)
 +            unrepaired.enable();
 +        // enable this last to make sure the strategies are ready to get calls.
 +        enabled = true;
 +    }
 +
 +    public void disable()
 +    {
 +        // disable this first avoid asking disabled strategies for compaction tasks
 +        enabled = false;
 +        if (repaired != null)
 +            repaired.disable();
 +        if (unrepaired != null)
 +            unrepaired.disable();
 +    }
 +
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create a scanner
 +     * @param sstables
 +     * @param ranges
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
 +        AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, ranges);
 +        AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, ranges);
 +        scanners.addAll(repairedScanners.scanners);
 +        scanners.addAll(unrepairedScanners.scanners);
 +        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
 +    {
 +        return getScanners(sstables, null);
 +    }
 +
 +    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
 +    {
 +        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false, false);
 +    }
 +
 +    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
 +    {
 +        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
 +        return params.isEnabled();
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
 +
 +    public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
 +    {
 +        logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
 +        setStrategy(params);
 +        if (shouldBeEnabled())
 +            enable();
 +        else
 +            disable();
 +        startup();
 +    }
 +
 +    private void setStrategy(CompactionParams params)
 +    {
 +        if (repaired != null)
 +            repaired.shutdown();
 +        if (unrepaired != null)
 +            unrepaired.shutdown();
 +        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
 +        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
 +        this.params = params;
 +    }
 +
 +    public CompactionParams getCompactionParams()
 +    {
 +        return params;
 +    }
 +
 +    public boolean onlyPurgeRepairedTombstones()
 +    {
 +        return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
 +    }
 +
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
 +    {
 +        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +        {
 +            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
 +        }
 +        else
 +        {
 +            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c09d49c,5d5701f..16090a1
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -195,10 -199,12 +199,12 @@@ public class Tracke
      public void reset()
      {
          view.set(new View(
-                          !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+                          !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+                                     : ImmutableList.<Memtable>of(),
                           ImmutableList.<Memtable>of(),
                           Collections.<SSTableReader, SSTableReader>emptyMap(),
 -                         Collections.<SSTableReader>emptySet(),
 +                         Collections.<SSTableReader, SSTableReader>emptyMap(),
+                          Collections.<SSTableReader>emptySet(),
                           SSTableIntervalTree.empty()));
      }
  
@@@ -327,10 -314,10 +332,10 @@@
          apply(View.markFlushing(memtable));
      }
  
 -    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
      {
          assert !isDummy();
-         if (sstables == null || sstables.isEmpty())
 -        if (sstable == null)
++        if (sstables.isEmpty())
          {
              // sstable may be null if we flushed batchlog and nothing needed to be retained
              // if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@@ -338,27 -325,58 +343,57 @@@
              return;
          }
  
 -        sstable.setupKeyCache();
 +        sstables.forEach(SSTableReader::setupOnline);
          // back up before creating a new Snapshot (which makes the new one eligible for compaction)
 -        maybeIncrementallyBackup(sstable);
 +        maybeIncrementallyBackup(sstables);
  
 -        apply(View.replaceFlushed(memtable, sstable));
 +        apply(View.replaceFlushed(memtable, sstables));
  
          Throwable fail;
 -        fail = updateSizeTracking(emptySet(), singleton(sstable), null);
 +        fail = updateSizeTracking(emptySet(), sstables, null);
-         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-         fail = notifyAdded(sstables, fail);
- 
-         if (!isDummy() && !cfstore.isValid())
-             dropSSTables();
  
          maybeFail(fail);
      }
  
+     /**
+      * permit compaction of the provided sstable; this translates to notifying compaction
+      * strategies of its existence, and potentially submitting a background task
+      */
 -    public void permitCompactionOfFlushed(SSTableReader sstable)
++    public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
+     {
 -        if (sstable == null)
++        if (sstables.isEmpty())
+             return;
+ 
 -        apply(View.permitCompactionOfFlushed(sstable));
++        apply(View.permitCompactionOfFlushed(sstables));
+ 
+         if (isDummy())
+             return;
+ 
+         if (cfstore.isValid())
+         {
 -            notifyAdded(sstable);
++            notifyAdded(sstables);
+             CompactionManager.instance.submitBackground(cfstore);
+         }
+         else
+         {
+             dropSSTables();
+         }
+     }
  
  
 -
      // MISCELLANEOUS public utility calls
  
+     public Set<SSTableReader> getSSTables()
+     {
+         return view.get().sstables;
+     }
+ 
+     public Iterable<SSTableReader> getPermittedToCompact()
+     {
+         View view = this.view.get();
+         return filter(view.sstables, not(in(view.premature)));
+     }
+ 
      public Set<SSTableReader> getCompacting()
      {
          return view.get().compacting;