You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:11 UTC

[20/23] cassandra git commit: Merge commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9

Merge commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9

* commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0':
  Change commitlog and sstables to track dirty and clean intervals.
  Disable passing control to post-flush after flush failure to prevent data loss.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b102173
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b102173
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b102173

Branch: refs/heads/trunk
Commit: 7b1021733b55c8865f80e261697b4c079d086633
Parents: 21c92ca 904cb5d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Aug 5 15:39:15 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:39:56 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  13 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  70 +---
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  21 +-
 .../AbstractCommitLogSegmentManager.java        |   4 +-
 .../cassandra/db/commitlog/CommitLog.java       |  11 +-
 .../db/commitlog/CommitLogReplayer.java         | 105 ++----
 .../db/commitlog/CommitLogSegment.java          |  82 +++--
 .../cassandra/db/commitlog/IntervalSet.java     | 192 +++++++++++
 .../compaction/AbstractCompactionStrategy.java  |   3 +
 .../compaction/CompactionStrategyManager.java   |   3 +
 .../apache/cassandra/db/lifecycle/Tracker.java  |  45 +--
 .../org/apache/cassandra/db/lifecycle/View.java |  37 +--
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../metadata/LegacyMetadataSerializer.java      |  17 +-
 .../io/sstable/metadata/MetadataCollector.java  |  37 +--
 .../io/sstable/metadata/StatsMetadata.java      |  44 +--
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++
 .../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
 .../legacy_mc_clust/mc-1-big-Data.db            | Bin 0 -> 5355 bytes
 .../legacy_mc_clust/mc-1-big-Digest.crc32       |   1 +
 .../legacy_mc_clust/mc-1-big-Filter.db          | Bin 0 -> 24 bytes
 .../legacy_mc_clust/mc-1-big-Index.db           | Bin 0 -> 157553 bytes
 .../legacy_mc_clust/mc-1-big-Statistics.db      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust/mc-1-big-Summary.db         | Bin 0 -> 47 bytes
 .../legacy_mc_clust/mc-1-big-TOC.txt            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_mc_clust_compact/mc-1-big-Data.db    | Bin 0 -> 5382 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_compact/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_compact/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_compact/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../legacy_mc_clust_counter/mc-1-big-Data.db    | Bin 0 -> 4631 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_counter/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_counter/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_counter/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 4625 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple/mc-1-big-Data.db           | Bin 0 -> 89 bytes
 .../legacy_mc_simple/mc-1-big-Digest.crc32      |   1 +
 .../legacy_mc_simple/mc-1-big-Filter.db         | Bin 0 -> 24 bytes
 .../legacy_mc_simple/mc-1-big-Index.db          | Bin 0 -> 26 bytes
 .../legacy_mc_simple/mc-1-big-Statistics.db     | Bin 0 -> 4639 bytes
 .../legacy_mc_simple/mc-1-big-Summary.db        | Bin 0 -> 47 bytes
 .../legacy_mc_simple/mc-1-big-TOC.txt           |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_compact/mc-1-big-Data.db   | Bin 0 -> 91 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_compact/mc-1-big-Index.db  | Bin 0 -> 26 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4680 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_compact/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_counter/mc-1-big-Data.db   | Bin 0 -> 110 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_counter/mc-1-big-Index.db  | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4648 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_counter/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4689 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../db/commitlog/CommitLogStressTest.java       |   3 +-
 test/unit/org/apache/cassandra/Util.java        |  21 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |  12 +-
 .../apache/cassandra/cql3/OutOfSpaceTest.java   |  33 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 151 ++++++++-
 .../cassandra/db/compaction/NeverPurgeTest.java |   6 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  12 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../metadata/MetadataSerializerTest.java        |  16 +-
 .../cassandra/utils/IntegerIntervalsTest.java   | 326 +++++++++++++++++++
 97 files changed, 1222 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 289f370,b596fc9..43d28f3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
 -3.0.9
 +3.9
 + * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
 + * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
 + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
 +Merged from 3.0:
+  * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
   * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
   * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
   * Fix upgrade of super columns on thrift (CASSANDRA-12335)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9d31b60,82604e2..21becfe
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -957,29 -922,19 +954,20 @@@ public class ColumnFamilyStore implemen
          final boolean flushSecondaryIndexes;
          final OpOrder.Barrier writeBarrier;
          final CountDownLatch latch = new CountDownLatch(1);
-         final CommitLogPosition commitLogUpperBound;
 -        volatile FSWriteError flushFailure = null;
 +        volatile Throwable flushFailure = null;
          final List<Memtable> memtables;
-         final List<Collection<SSTableReader>> readers;
  
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
 +        private PostFlush(boolean flushSecondaryIndexes,
 +                          OpOrder.Barrier writeBarrier,
-                           CommitLogPosition commitLogUpperBound,
-                           List<Memtable> memtables,
-                           List<Collection<SSTableReader>> readers)
+                           List<Memtable> memtables)
          {
              this.writeBarrier = writeBarrier;
              this.flushSecondaryIndexes = flushSecondaryIndexes;
-             this.commitLogUpperBound = commitLogUpperBound;
              this.memtables = memtables;
-             this.readers = readers;
          }
  
 -        public ReplayPosition call()
 +        public CommitLogPosition call()
          {
-             if (discardFlushResults == ColumnFamilyStore.this)
-                 return commitLogUpperBound;
- 
              writeBarrier.await();
  
              /**
@@@ -1003,19 -958,13 +991,13 @@@
                  throw new IllegalStateException();
              }
  
-             // Must check commitLogUpperBound != null because Flush may find that all memtables are clean
-             // and so not set a commitLogUpperBound
 -            ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
++            CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
              // If a flush errored out but the error was ignored, make sure we don't discard the commit log.
-             if (flushFailure == null)
+             if (flushFailure == null && !memtables.isEmpty())
              {
-                 CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
-                 for (int i = 0 ; i < memtables.size() ; i++)
-                 {
-                     Memtable memtable = memtables.get(i);
-                     Collection<SSTableReader> reader = readers.get(i);
-                     memtable.cfs.data.permitCompactionOfFlushed(reader);
-                     memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
-                 }
+                 Memtable memtable = memtables.get(0);
+                 commitLogUpperBound = memtable.getCommitLogUpperBound();
+                 CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound);
              }
  
              metric.pendingFlushes.dec();
@@@ -1079,9 -1027,9 +1060,9 @@@
  
              // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
              // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
 -            // replay positions have also completed, i.e. the memtables are done and ready to flush
 +            // commit log segment position have also completed, i.e. the memtables are done and ready to flush
              writeBarrier.issue();
-             postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
+             postFlush = new PostFlush(!truncate, writeBarrier, memtables);
          }
  
          public void run()
@@@ -1111,110 -1059,23 +1092,108 @@@
              try
              {
                  for (Memtable memtable : memtables)
--                {
-                     this.readers.add(flushMemtable(memtable));
 -                    Collection<SSTableReader> readers = memtable.flush();
 -                    memtable.cfs.replaceFlushed(memtable, readers);
 -                    reclaim(memtable);
--                }
++                    flushMemtable(memtable);
              }
 -            catch (FSWriteError e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
              postFlush.latch.countDown();
          }
  
 +        public Collection<SSTableReader> flushMemtable(Memtable memtable)
 +        {
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
 +            {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
 +                try
 +                {
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
 +                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
 +                }
 +                catch (Throwable t)
 +                {
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
 +                }
 +
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
 +
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
-             memtable.cfs.data.replaceFlushed(memtable, sstables);
++            memtable.cfs.replaceFlushed(memtable, sstables);
 +            reclaim(memtable);
 +            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
 +        }
 +
          private void reclaim(final Memtable memtable)
          {
              // issue a read barrier for reclaiming the memory, and offload the wait to another thread
@@@ -2268,10 -2085,10 +2222,10 @@@
      {
          Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
          {
 -            public LifecycleTransaction call() throws Exception
 +            public LifecycleTransaction call()
              {
                  assert data.getCompacting().isEmpty() : data.getCompacting();
-                 Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+                 Iterable<SSTableReader> sstables = getLiveSSTables();
                  sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
                  sstables = ImmutableList.copyOf(sstables);
                  LifecycleTransaction modifier = data.tryModify(sstables, operationType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 7a46d8a,3c77092..e9cca4a
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -33,7 -33,9 +33,9 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
 -import org.apache.cassandra.db.commitlog.ReplayPosition;
+ import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.filter.ClusteringIndexFilter;
  import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -192,6 -194,11 +194,11 @@@ public class Memtable implements Compar
          return commitLogLowerBound.get();
      }
  
 -    public ReplayPosition getCommitLogUpperBound()
++    public CommitLogPosition getCommitLogUpperBound()
+     {
+         return commitLogUpperBound.get();
+     }
+ 
      public boolean isLive()
      {
          return allocator.isLive();
@@@ -361,63 -337,39 +368,72 @@@
          return minTimestamp;
      }
  
+     /**
+      * For testing only. Give this memtable too big a size to make it always fail flushing.
+      */
+     @VisibleForTesting
+     public void makeUnflushable()
+     {
+         liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+     }
+ 
 -    private long estimatedSize()
 +    class FlushRunnable implements Callable<SSTableMultiWriter>
      {
 -        long keySize = 0;
 -        for (PartitionPosition key : partitions.keySet())
 +        private final long estimatedSize;
 +        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
 +
 +        private final boolean isBatchLogTable;
 +        private final SSTableMultiWriter writer;
 +
 +        // keeping these to be able to log what we are actually flushing
 +        private final PartitionPosition from;
 +        private final PartitionPosition to;
 +
 +        FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
          {
 -            //  make sure we don't write non-sensical keys
 -            assert key instanceof DecoratedKey;
 -            keySize += ((DecoratedKey)key).getKey().remaining();
 +            this(partitions.subMap(from, to), flushLocation, from, to, txn);
          }
 -        return (long) ((keySize // index entries
 -                        + keySize // keys in data file
 -                        + liveDataSize.get()) // data
 -                       * 1.2); // bloom filter and row index overhead
 -    }
  
 -    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
 -    {
 -        boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 +        FlushRunnable(LifecycleTransaction txn)
 +        {
 +            this(partitions, null, null, null, txn);
 +        }
 +
 +        FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
 +        {
 +            this.toFlush = toFlush;
 +            this.from = from;
 +            this.to = to;
 +            long keySize = 0;
 +            for (PartitionPosition key : toFlush.keySet())
 +            {
 +                //  make sure we don't write non-sensical keys
 +                assert key instanceof DecoratedKey;
 +                keySize += ((DecoratedKey) key).getKey().remaining();
 +            }
 +            estimatedSize = (long) ((keySize // index entries
 +                                    + keySize // keys in data file
 +                                    + liveDataSize.get()) // data
 +                                    * 1.2); // bloom filter and row index overhead
 +
 +            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
  
 -        logger.debug("Writing {}", Memtable.this.toString());
 +            if (flushLocation == null)
 +                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
 +            else
 +                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
 +
 +        }
  
 -        Collection<SSTableReader> ssTables;
 -        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
 +        protected Directories getDirectories()
          {
 +            return cfs.getDirectories();
 +        }
 +
 +        private void writeSortedContents()
 +        {
 +            logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
 +
              boolean trackContention = logger.isTraceEnabled();
              int heavilyContendedRowCount = 0;
              // (we can't clear out the map as-we-go to free up memory,
@@@ -444,39 -396,58 +460,38 @@@
                  }
              }
  
 -            if (writer.getFilePointer() > 0)
 -            {
 -                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
 -                                           writer.getFilename(),
 -                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
 -                                           commitLogUpperBound));
 -
 -                // sstables should contain non-repaired data.
 -                ssTables = writer.finish(true);
 -            }
 -            else
 -            {
 -                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
 -                             writer.getFilename(), commitLogUpperBound);
 -                writer.abort();
 -                ssTables = Collections.emptyList();
 -            }
 +            long bytesFlushed = writer.getFilePointer();
 +            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
 +                                                                              writer.getFilename(),
 +                                                                              FBUtilities.prettyPrintMemory(bytesFlushed),
 +                                                                              commitLogUpperBound));
 +            // Update the metrics
 +            cfs.metric.bytesFlushed.inc(bytesFlushed);
  
              if (heavilyContendedRowCount > 0)
 -                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 -
 -            return ssTables;
 +                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
          }
 -    }
  
 -    @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
 -    public SSTableTxnWriter createFlushWriter(String filename,
 -                                              PartitionColumns columns,
 -                                              EncodingStats stats)
 -    {
 -        // we operate "offline" here, as we expose the resulting reader consciously when done
 -        // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
 -        LifecycleTransaction txn = null;
 -        try
 +        public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
 +                                                  String filename,
 +                                                  PartitionColumns columns,
 +                                                  EncodingStats stats)
          {
 -            txn = LifecycleTransaction.offline(OperationType.FLUSH);
              MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
-                     .commitLogLowerBound(commitLogLowerBound.get())
-                     .commitLogUpperBound(commitLogUpperBound.get());
 -                    .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get()));
 -
 -            return new SSTableTxnWriter(txn,
 -                                        cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
 -                                                                     (long) partitions.size(),
 -                                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
 -                                                                     sstableMetadataCollector,
 -                                                                     new SerializationHeader(true, cfs.metadata, columns, stats),
 -                                                                     txn));
++                    .commitLogIntervals(new IntervalSet<>(commitLogLowerBound.get(), commitLogUpperBound.get()));
++
 +            return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
 +                                                (long)toFlush.size(),
 +                                                ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                                sstableMetadataCollector,
 +                                                new SerializationHeader(true, cfs.metadata, columns, stats), txn);
- 
          }
 -        catch (Throwable t)
 +
 +        @Override
 +        public SSTableMultiWriter call()
          {
 -            if (txn != null)
 -                txn.close();
 -            throw t;
 +            writeSortedContents();
 +            return writer;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7ea7439,0000000..8f3b7e4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,582 -1,0 +1,582 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
 +import com.google.common.util.concurrent.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +
 +/**
 + * Performs eager-creation of commit log segments in a background thread. All the
 + * public methods are thread safe.
 + */
 +public abstract class AbstractCommitLogSegmentManager
 +{
 +    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 +
 +    // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
 +    private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
 +
 +    /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
 +
 +    /** Active segments, containing unflushed data */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
 +
 +    /** The segment we are currently allocating commit log records to */
 +    protected volatile CommitLogSegment allocatingFrom = null;
 +
 +    private final WaitQueue hasAvailableSegments = new WaitQueue();
 +
 +    final String storageDirectory;
 +
 +    /**
 +     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
 +     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
 +     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
 +     * on the manager thread, which will take a ms or two).
 +     */
 +    private final AtomicLong size = new AtomicLong();
 +
 +    /**
 +     * New segment creation is initially disabled because we'll typically get some "free" segments
 +     * recycled after log replay.
 +     */
 +    volatile boolean createReserveSegments = false;
 +
 +    private Thread managerThread;
 +    protected volatile boolean run = true;
 +    protected final CommitLog commitLog;
 +
 +    private static final SimpleCachedBufferPool bufferPool =
 +        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +    AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
 +    {
 +        this.commitLog = commitLog;
 +        this.storageDirectory = storageDirectory;
 +    }
 +
 +    void start()
 +    {
 +        // The run loop for the manager thread
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws Exception
 +            {
 +                while (run)
 +                {
 +                    try
 +                    {
 +                        Runnable task = segmentManagementTasks.poll();
 +                        if (task == null)
 +                        {
 +                            // if we have no more work to do, check if we should create a new segment
 +                            if (!atSegmentLimit() &&
 +                                availableSegments.isEmpty() &&
 +                                (activeSegments.isEmpty() || createReserveSegments))
 +                            {
 +                                logger.trace("No segments in reserve; creating a fresh one");
 +                                // TODO : some error handling in case we fail to create a new segment
 +                                availableSegments.add(createSegment());
 +                                hasAvailableSegments.signalAll();
 +                            }
 +
 +                            // flush old Cfs if we're full
 +                            long unused = unusedCapacity();
 +                            if (unused < 0)
 +                            {
 +                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
 +                                long spaceToReclaim = 0;
 +                                for (CommitLogSegment segment : activeSegments)
 +                                {
 +                                    if (segment == allocatingFrom)
 +                                        break;
 +                                    segmentsToRecycle.add(segment);
 +                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
 +                                    if (spaceToReclaim + unused >= 0)
 +                                        break;
 +                                }
 +                                flushDataFrom(segmentsToRecycle, false);
 +                            }
 +
 +                            // Since we're operating on a "null" allocation task, block here for the next task on the
 +                            // queue rather than looping, grabbing another null, and repeating the above work.
 +                            try
 +                            {
 +                                task = segmentManagementTasks.take();
 +                            }
 +                            catch (InterruptedException e)
 +                            {
 +                                throw new AssertionError();
 +                            }
 +                        }
 +                        task.run();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        JVMStabilityInspector.inspectThrowable(t);
 +                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
 +                            return;
 +                        // sleep some arbitrary period to avoid spamming CL
 +                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 +                    }
 +                }
 +            }
 +
 +            private boolean atSegmentLimit()
 +            {
 +                return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
 +            }
 +        };
 +
 +        run = true;
 +
 +        managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
 +        managerThread.start();
 +    }
 +
 +
 +    /**
 +     * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything.
 +     */
 +    public abstract void shutdown();
 +
 +    /**
 +     * Allocate a segment within this CLSM. Should either succeed or throw.
 +     */
 +    public abstract Allocation allocate(Mutation mutation, int size);
 +
 +    /**
 +     * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
 +     * decide what to do with those segments on disk after they've been replayed.
 +     */
 +    abstract void handleReplayedSegment(final File file);
 +
 +    /**
 +     * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
 +     * to segment manager so it's performed on segment management thread.
 +     */
 +    abstract CommitLogSegment createSegment();
 +
 +    /**
 +     * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
 +     * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
 +     * during testing resets.
 +     *
 +     * @param segment segment to be discarded
 +     * @param delete  whether or not the segment is safe to be deleted.
 +     */
 +    abstract void discard(CommitLogSegment segment, boolean delete);
 +
 +
 +    /**
 +     * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator
 +     * is working on initial allocation of a CommitLogSegment.
 +     */
 +    CommitLogSegment allocatingFrom()
 +    {
 +        CommitLogSegment r = allocatingFrom;
 +        if (r == null)
 +        {
 +            advanceAllocatingFrom(null);
 +            r = allocatingFrom;
 +        }
 +        return r;
 +    }
 +
 +    /**
 +     * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it.
 +     * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled.
 +     *
 +     * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
 +     */
 +    protected void advanceAllocatingFrom(CommitLogSegment old)
 +    {
 +        while (true)
 +        {
 +            CommitLogSegment next;
 +            synchronized (this)
 +            {
 +                // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
 +                // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
 +                if (allocatingFrom != old)
 +                    return;
 +                next = availableSegments.poll();
 +                if (next != null)
 +                {
 +                    allocatingFrom = next;
 +                    activeSegments.add(next);
 +                }
 +            }
 +
 +            if (next != null)
 +            {
 +                if (old != null)
 +                {
 +                    // Now we can run the user defined command just after switching to the new commit log.
 +                    // (Do this here instead of in the recycle call so we can get a head start on the archive.)
 +                    commitLog.archiver.maybeArchive(old);
 +
 +                    // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
 +                    old.discardUnusedTail();
 +                }
 +
 +                // request that the CL be synced out-of-band, as we've finished a segment
 +                commitLog.requestExtraSync();
 +                return;
 +            }
 +
 +            // no more segments, so register to receive a signal when not empty
 +            WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 +
 +            // trigger the management thread; this must occur after registering
 +            // the signal to ensure we are woken by any new segment creation
 +            wakeManager();
 +
 +            // check if the queue has already been added to before waiting on the signal, to catch modifications
 +            // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
 +            if (!availableSegments.isEmpty() || allocatingFrom != old)
 +            {
 +                signal.cancel();
 +                // if we've been beaten, just stop immediately
 +                if (allocatingFrom != old)
 +                    return;
 +                // otherwise try again, as there should be an available segment
 +                continue;
 +            }
 +
 +            // can only reach here if the queue hasn't been inserted into
 +            // before we registered the signal, as we only remove items from the queue
 +            // after updating allocatingFrom. Can safely block until we are signalled
 +            // by the allocator that new segments have been published
 +            signal.awaitUninterruptibly();
 +        }
 +    }
 +
 +    protected void wakeManager()
 +    {
 +        // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
 +        segmentManagementTasks.add(Runnables.doNothing());
 +    }
 +
 +    /**
 +     * Switch to a new segment, regardless of how much is left in the current one.
 +     *
 +     * Flushes any dirty CFs for this segment and any older segments, and then recycles
 +     * the segments
 +     */
 +    void forceRecycleAll(Iterable<UUID> droppedCfs)
 +    {
 +        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
 +        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
 +        advanceAllocatingFrom(last);
 +
 +        // wait for the commit log modifications
 +        last.waitForModifications();
 +
 +        // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
 +        // on the relevant keyspaces to complete
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
 +        Future<?> future = flushDataFrom(segmentsToRecycle, true);
 +        try
 +        {
 +            future.get();
 +
 +            for (CommitLogSegment segment : activeSegments)
 +                for (UUID cfId : droppedCfs)
-                     segment.markClean(cfId, segment.getCurrentCommitLogPosition());
++                    segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +
 +            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
 +            // if the previous active segment was the only one to recycle (since an active segment isn't
 +            // necessarily dirty, and we only call dCS after a flush).
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment.isUnused())
 +                    recycleSegment(segment);
 +            }
 +
 +            CommitLogSegment first;
 +            if ((first = activeSegments.peek()) != null && first.id <= last.id)
 +                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
 +        }
 +        catch (Throwable t)
 +        {
 +            // for now just log the error
 +            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment is no longer in use and that it should be recycled.
 +     *
 +     * @param segment segment that is no longer in use
 +     */
 +    void recycleSegment(final CommitLogSegment segment)
 +    {
 +        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
 +        if (activeSegments.remove(segment))
 +        {
 +            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
 +            discardSegment(segment, archiveSuccess);
 +        }
 +        else
 +        {
 +            logger.warn("segment {} not found in activeSegments queue", segment);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment file should be deleted.
 +     *
 +     * @param segment segment to be discarded
 +     */
 +    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
 +    {
 +        logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
 +        segmentManagementTasks.add(() -> discard(segment, deleteFile));
 +    }
 +
 +    /**
 +     * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
 +     * @param addedSize
 +     */
 +    void addSize(long addedSize)
 +    {
 +        size.addAndGet(addedSize);
 +    }
 +
 +    /**
 +     * @return the space (in bytes) used by all segment files.
 +     */
 +    public long onDiskSize()
 +    {
 +        return size.get();
 +    }
 +
 +    private long unusedCapacity()
 +    {
 +        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
 +        long currentSize = size.get();
 +        logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
 +        return total - currentSize;
 +    }
 +
 +    /**
 +     * @param name the filename to check
 +     * @return true if file is managed by this manager.
 +     */
 +    public boolean manages(String name)
 +    {
 +        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
 +            if (segment.getName().equals(name))
 +                return true;
 +        return false;
 +    }
 +
 +    /**
 +     * Throws a flag that enables the behavior of keeping at least one spare segment
 +     * available at all times.
 +     */
 +    void enableReserveSegmentCreation()
 +    {
 +        createReserveSegments = true;
 +        wakeManager();
 +    }
 +
 +    /**
 +     * Force a flush on all CFs that are still dirty in @param segments.
 +     *
 +     * @return a Future that will finish when all the flushes are complete.
 +     */
 +    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
 +    {
 +        if (segments.isEmpty())
 +            return Futures.immediateFuture(null);
 +        final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 +
 +        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
 +        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
 +
 +        for (CommitLogSegment segment : segments)
 +        {
 +            for (UUID dirtyCFId : segment.getDirtyCFIDs())
 +            {
 +                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
 +                if (pair == null)
 +                {
 +                    // even though we remove the schema entry before a final flush when dropping a CF,
 +                    // it's still possible for a writer to race and finish his append after the flush.
 +                    logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
-                     segment.markClean(dirtyCFId, segment.getCurrentCommitLogPosition());
++                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +                }
 +                else if (!flushes.containsKey(dirtyCFId))
 +                {
 +                    String keyspace = pair.left;
 +                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
 +                    // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
 +                    // no deadlock possibility since switchLock removal
 +                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
 +                }
 +            }
 +        }
 +
 +        return Futures.allAsList(flushes.values());
 +    }
 +
 +    /**
 +     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
 +     * Only call this after the AbstractCommitLogService is shut down.
 +     */
 +    public void stopUnsafe(boolean deleteSegments)
 +    {
 +        logger.trace("CLSM closing and clearing existing commit log segments...");
 +        createReserveSegments = false;
 +
 +        awaitManagementTasksCompletion();
 +
 +        shutdown();
 +        try
 +        {
 +            awaitTermination();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        synchronized (this)
 +        {
 +            for (CommitLogSegment segment : activeSegments)
 +                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +            activeSegments.clear();
 +
 +            for (CommitLogSegment segment : availableSegments)
 +                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +            availableSegments.clear();
 +        }
 +
 +        allocatingFrom = null;
 +
 +        segmentManagementTasks.clear();
 +
 +        size.set(0L);
 +
 +        logger.trace("CLSM done with closing and clearing existing commit log segments.");
 +    }
 +
 +    // Used by tests only.
 +    void awaitManagementTasksCompletion()
 +    {
 +        while (!segmentManagementTasks.isEmpty())
 +            Thread.yield();
 +        // The last management task is not yet complete. Wait a while for it.
 +        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +        // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
 +        // waiting completes correctly.
 +    }
 +
 +    /**
 +     * Explicitly for use only during resets in unit testing.
 +     */
 +    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
 +    {
 +        try
 +        {
 +            discard(segment, delete);
 +        }
 +        catch (AssertionError ignored)
 +        {
 +            // segment file does not exist
 +        }
 +    }
 +
 +    /**
 +     * Returns when the management thread terminates.
 +     */
 +    public void awaitTermination() throws InterruptedException
 +    {
 +        managerThread.join();
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            segment.close();
 +
 +        for (CommitLogSegment segment : availableSegments)
 +            segment.close();
 +
 +        bufferPool.shutdown();
 +    }
 +
 +    /**
 +     * @return a read-only collection of the active commit log segments
 +     */
 +    @VisibleForTesting
 +    public Collection<CommitLogSegment> getActiveSegments()
 +    {
 +        return Collections.unmodifiableCollection(activeSegments);
 +    }
 +
 +    /**
 +     * @return the current CommitLogPosition of the active segment we're allocating from
 +     */
 +    CommitLogPosition getCurrentPosition()
 +    {
 +        return allocatingFrom().getCurrentCommitLogPosition();
 +    }
 +
 +    /**
 +     * Forces a disk flush on the commit log files that need it.  Blocking.
 +     */
 +    public void sync(boolean syncAllSegments) throws IOException
 +    {
 +        CommitLogSegment current = allocatingFrom();
 +        for (CommitLogSegment segment : getActiveSegments())
 +        {
 +            if (!syncAllSegments && segment.id > current.id)
 +                return;
 +            segment.sync();
 +        }
 +    }
 +
 +    /**
 +     * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
 +     */
 +    SimpleCachedBufferPool getBufferPool()
 +    {
 +        return bufferPool;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b66221c,dfe3f91..32f69eb
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -298,20 -290,21 +298,21 @@@ public class CommitLog implements Commi
       * given. Discards any commit log segments that are no longer used.
       *
       * @param cfId    the column family ID that was flushed
-      * @param context the commit log segment position of the flush
+      * @param lowerBound the lowest covered replay position of the flush
+      * @param lowerBound the highest covered replay position of the flush
       */
-     public void discardCompletedSegments(final UUID cfId, final CommitLogPosition context)
 -    public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound)
++    public void discardCompletedSegments(final UUID cfId, final CommitLogPosition lowerBound, final CommitLogPosition upperBound)
      {
-         logger.trace("discard completed log segments for {}, table {}", context, cfId);
+         logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId);
  
          // Go thru the active segment files, which are ordered oldest to newest, marking the
 -        // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
 +        // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed
          // in the arguments. Any segments that become unused after they are marked clean will be
          // recycled or discarded.
 -        for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
 +        for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();)
          {
              CommitLogSegment segment = iter.next();
-             segment.markClean(cfId, context);
+             segment.markClean(cfId, lowerBound, upperBound);
  
              if (segment.isUnused())
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index c8e597f,af8efb4..92364c8
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -54,32 -70,31 +54,32 @@@ public class CommitLogReplayer implemen
      static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
      private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
      private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
 -    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
  
 -    private final Set<Keyspace> keyspacesRecovered;
 -    private final List<Future<?>> futures;
 -    private final Map<UUID, AtomicInteger> invalidMutations;
 +    private final Set<Keyspace> keyspacesReplayed;
 +    private final Queue<Future<Integer>> futures;
 +
      private final AtomicInteger replayedCount;
-     private final Map<UUID, ReplayPositionFilter> cfPersisted;
 -    private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted;
 -    private final ReplayPosition globalPosition;
 -    private final CRC32 checksum;
 -    private byte[] buffer;
 -    private byte[] uncompressedBuffer;
++    private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted;
 +    private final CommitLogPosition globalPosition;
 +
 +    // Used to throttle speed of replay of mutations if we pass the max outstanding count
 +    private long pendingMutationBytes = 0;
  
      private final ReplayFilter replayFilter;
      private final CommitLogArchiver archiver;
  
 -    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter)
 +    @VisibleForTesting
 +    protected CommitLogReader commitLogReader;
 +
 +    CommitLogReplayer(CommitLog commitLog,
 +                      CommitLogPosition globalPosition,
-                       Map<UUID, ReplayPositionFilter> cfPersisted,
++                      Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted,
 +                      ReplayFilter replayFilter)
      {
 -        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
 -        this.futures = new ArrayList<Future<?>>();
 -        this.buffer = new byte[4096];
 -        this.uncompressedBuffer = new byte[4096];
 -        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
 +        this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
 +        this.futures = new ArrayDeque<Future<Integer>>();
          // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
          this.replayedCount = new AtomicInteger();
 -        this.checksum = new CRC32();
          this.cfPersisted = cfPersisted;
          this.globalPosition = globalPosition;
          this.replayFilter = replayFilter;
@@@ -89,10 -103,9 +89,10 @@@
  
      public static CommitLogReplayer construct(CommitLog commitLog)
      {
-         // compute per-CF and global commit log segment positions
-         Map<UUID, ReplayPositionFilter> cfPersisted = new HashMap<>();
+         // compute per-CF and global replay intervals
 -        Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>();
++        Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>();
          ReplayFilter replayFilter = ReplayFilter.create();
-         CommitLogPosition globalPosition = null;
++
          for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
          {
              // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
@@@ -117,15 -130,11 +117,11 @@@
                  }
              }
  
-             ReplayPositionFilter filter = new ReplayPositionFilter(cfs.getSSTables(), truncatedAt);
-             if (!filter.isEmpty())
-                 cfPersisted.put(cfs.metadata.cfId, filter);
-             else
-                 globalPosition = CommitLogPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
 -            IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
++            IntervalSet<CommitLogPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
+             cfPersisted.put(cfs.metadata.cfId, filter);
          }
-         if (globalPosition == null)
-             globalPosition = firstNotCovered(cfPersisted.values());
-         logger.trace("Global commit log segment position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
 -        ReplayPosition globalPosition = firstNotCovered(cfPersisted.values());
++        CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values());
+         logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
          return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
      }
  
@@@ -174,135 -208,38 +170,105 @@@
          return replayedCount.get();
      }
  
 -    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
 +    /*
 +     * Wrapper around initiating mutations read from the log to make it possible
 +     * to spy on initiated mutations for test
 +     */
 +    @VisibleForTesting
 +    public static class MutationInitiator
      {
 -        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
 -        {
 -            // There was no room in the segment to write a final header. No data could be present here.
 -            return -1;
 -        }
 -        reader.seek(offset);
 -        CRC32 crc = new CRC32();
 -        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
 -        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
 -        updateChecksumInt(crc, (int) reader.getPosition());
 -        int end = reader.readInt();
 -        long filecrc = reader.readInt() & 0xffffffffL;
 -        if (crc.getValue() != filecrc)
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                                                   final long segmentId,
 +                                                   final int serializedSize,
 +                                                   final int entryLocation,
 +                                                   final CommitLogReplayer commitLogReplayer)
          {
 -            if (end != 0 || filecrc != 0)
 +            Runnable runnable = new WrappedRunnable()
              {
 -                handleReplayError(false,
 -                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
 -                                  "The end of segment marker should be zero.",
 -                                  offset, reader.getPath());
 -            }
 -            return -1;
 -        }
 -        else if (end < offset || end > reader.length())
 -        {
 -            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
 -                              offset, reader.getPath());
 -            return -1;
 +                public void runMayThrow()
 +                {
 +                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                        return;
 +                    if (commitLogReplayer.pointInTimeExceeded(mutation))
 +                        return;
 +
 +                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +
 +                    // Rebuild the mutation, omitting column families that
 +                    //    a) the user has requested that we ignore,
 +                    //    b) have already been flushed,
 +                    // or c) are part of a cf that was dropped.
 +                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 +                    Mutation newMutation = null;
 +                    for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
 +                    {
 +                        if (Schema.instance.getCF(update.metadata().cfId) == null)
 +                            continue; // dropped
 +
 +                        // replay if current segment is newer than last flushed one or,
 +                        // if it is the last known segment, if we are after the commit log segment position
 +                        if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
 +                        {
 +                            if (newMutation == null)
 +                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 +                            newMutation.add(update);
 +                            commitLogReplayer.replayedCount.incrementAndGet();
 +                        }
 +                    }
 +                    if (newMutation != null)
 +                    {
 +                        assert !newMutation.isEmpty();
 +
 +                        try
 +                        {
 +                            Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
 +                        }
 +                        catch (ExecutionException e)
 +                        {
 +                            throw Throwables.propagate(e.getCause());
 +                        }
 +
 +                        commitLogReplayer.keyspacesReplayed.add(keyspace);
 +                    }
 +                }
 +            };
 +            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
          }
 -        return end;
 +    }
 +
 +    /**
-      * A filter of known safe-to-discard commit log replay positions, based on
++     * A set of known safe-to-discard commit log replay positions, based on
 +     * the range covered by on disk sstables and those prior to the most recent truncation record
 +     */
-     public static class ReplayPositionFilter
++    public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
 +    {
-         final NavigableMap<CommitLogPosition, CommitLogPosition> persisted = new TreeMap<>();
-         public ReplayPositionFilter(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
-         {
-             for (SSTableReader reader : onDisk)
-             {
-                 CommitLogPosition start = reader.getSSTableMetadata().commitLogLowerBound;
-                 CommitLogPosition end = reader.getSSTableMetadata().commitLogUpperBound;
-                 add(persisted, start, end);
-             }
-             if (truncatedAt != null)
-                 add(persisted, CommitLogPosition.NONE, truncatedAt);
-         }
++        IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>();
++        for (SSTableReader reader : onDisk)
++            builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
 +
-         private static void add(NavigableMap<CommitLogPosition, CommitLogPosition> ranges, CommitLogPosition start, CommitLogPosition end)
-         {
-             // extend ourselves to cover any ranges we overlap
-             // record directly preceding our end may extend past us, so take the max of our end and its
-             Map.Entry<CommitLogPosition, CommitLogPosition> extend = ranges.floorEntry(end);
-             if (extend != null && extend.getValue().compareTo(end) > 0)
-                 end = extend.getValue();
- 
-             // record directly preceding our start may extend into us; if it does, we take it as our start
-             extend = ranges.lowerEntry(start);
-             if (extend != null && extend.getValue().compareTo(start) >= 0)
-                 start = extend.getKey();
- 
-             ranges.subMap(start, end).clear();
-             ranges.put(start, end);
-         }
- 
-         public boolean shouldReplay(CommitLogPosition position)
-         {
-             // replay ranges are start exclusive, end inclusive
-             Map.Entry<CommitLogPosition, CommitLogPosition> range = persisted.lowerEntry(position);
-             return range == null || position.compareTo(range.getValue()) > 0;
-         }
- 
-         public boolean isEmpty()
-         {
-             return persisted.isEmpty();
-         }
++        if (truncatedAt != null)
++            builder.add(CommitLogPosition.NONE, truncatedAt);
++        return builder.build();
 +    }
 +
-     public static CommitLogPosition firstNotCovered(Iterable<ReplayPositionFilter> ranges)
++    /**
++     * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
++     *
++     * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
++     * given table was constructed* and hence we can start replay from the end of that interval.
++     *
++     * If such an interval is not known, we must replay from the beginning.
++     *
++     * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
++     *   succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
++     *   incorrect during replay there is little chance that the affected deployment is in production.
++     */
++    public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges)
 +    {
-         CommitLogPosition min = null;
-         for (ReplayPositionFilter map : ranges)
-         {
-             CommitLogPosition first = map.persisted.firstEntry().getValue();
-             if (min == null)
-                 min = first;
-             else
-                 min = Ordering.natural().min(min, first);
-         }
-         if (min == null)
-             return CommitLogPosition.NONE;
-         return min;
++        return ranges.stream()
++                .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE)) 
++                .min(Ordering.natural())
++                .get(); // iteration is per known-CF, there must be at least one. 
      }
  
      abstract static class ReplayFilter
@@@ -386,12 -323,346 +352,11 @@@
       *
       * @return true iff replay is necessary
       */
 -    private boolean shouldReplay(UUID cfId, ReplayPosition position)
 +    private boolean shouldReplay(UUID cfId, CommitLogPosition position)
      {
-         ReplayPositionFilter filter = cfPersisted.get(cfId);
-         return filter == null || filter.shouldReplay(position);
+         return !cfPersisted.get(cfId).contains(position);
      }
  
 -    @SuppressWarnings("resource")
 -    public void recover(File file, boolean tolerateTruncation) throws IOException
 -    {
 -        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 -        try(ChannelProxy channel = new ChannelProxy(file);
 -            RandomAccessReader reader = RandomAccessReader.open(channel))
 -        {
 -            if (desc.version < CommitLogDescriptor.VERSION_21)
 -            {
 -                if (logAndCheckIfShouldSkip(file, desc))
 -                    return;
 -                if (globalPosition.segment == desc.id)
 -                    reader.seek(globalPosition.position);
 -                replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
 -                return;
 -            }
 -
 -            final long segmentId = desc.id;
 -            try
 -            {
 -                desc = CommitLogDescriptor.readHeader(reader);
 -            }
 -            catch (IOException e)
 -            {
 -                desc = null;
 -            }
 -            if (desc == null) {
 -                handleReplayError(false, "Could not read commit log descriptor in file %s", file);
 -                return;
 -            }
 -            if (segmentId != desc.id)
 -            {
 -                handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
 -                // continue processing if ignored.
 -            }
 -
 -            if (logAndCheckIfShouldSkip(file, desc))
 -                return;
 -
 -            ICompressor compressor = null;
 -            if (desc.compression != null)
 -            {
 -                try
 -                {
 -                    compressor = CompressionParams.createCompressor(desc.compression);
 -                }
 -                catch (ConfigurationException e)
 -                {
 -                    handleReplayError(false, "Unknown compression: %s", e.getMessage());
 -                    return;
 -                }
 -            }
 -
 -            assert reader.length() <= Integer.MAX_VALUE;
 -            int end = (int) reader.getFilePointer();
 -            int replayEnd = end;
 -
 -            while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
 -            {
 -                int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
 -
 -                if (logger.isTraceEnabled())
 -                    logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
 -                if (compressor != null)
 -                {
 -                    int uncompressedLength = reader.readInt();
 -                    replayEnd = replayPos + uncompressedLength;
 -                }
 -                else
 -                {
 -                    replayEnd = end;
 -                }
 -
 -                if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
 -                    // Skip over flushed section.
 -                    continue;
 -
 -                FileDataInput sectionReader = reader;
 -                String errorContext = desc.fileName();
 -                // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
 -                boolean tolerateErrorsInSection = tolerateTruncation;
 -                if (compressor != null)
 -                {
 -                    // In the compressed case we know if this is the last section.
 -                    tolerateErrorsInSection &= end == reader.length() || end < 0;
 -
 -                    int start = (int) reader.getFilePointer();
 -                    try
 -                    {
 -                        int compressedLength = end - start;
 -                        if (logger.isTraceEnabled())
 -                            logger.trace("Decompressing {} between replay positions {} and {}",
 -                                         file,
 -                                         replayPos,
 -                                         replayEnd);
 -                        if (compressedLength > buffer.length)
 -                            buffer = new byte[(int) (1.2 * compressedLength)];
 -                        reader.readFully(buffer, 0, compressedLength);
 -                        int uncompressedLength = replayEnd - replayPos;
 -                        if (uncompressedLength > uncompressedBuffer.length)
 -                            uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
 -                        compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
 -                        sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
 -                        errorContext = "compressed section at " + start + " in " + errorContext;
 -                    }
 -                    catch (IOException | ArrayIndexOutOfBoundsException e)
 -                    {
 -                        handleReplayError(tolerateErrorsInSection,
 -                                          "Unexpected exception decompressing section at %d: %s",
 -                                          start, e);
 -                        continue;
 -                    }
 -                }
 -
 -                if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
 -                    break;
 -            }
 -            logger.debug("Finished reading {}", file);
 -        }
 -    }
 -
 -    public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
 -    {
 -        logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})",
 -                    file.getPath(),
 -                    desc.version,
 -                    desc.getMessagingVersion(),
 -                    desc.compression);
 -
 -        if (globalPosition.segment > desc.id)
 -        {
 -            logger.trace("skipping replay of fully-flushed {}", file);
 -            return true;
 -        }
 -        return false;
 -    }
 -
 -    /**
 -     * Replays a sync section containing a list of mutations.
 -     *
 -     * @return Whether replay should continue with the next section.
 -     */
 -    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
 -    {
 -         /* read the logs populate Mutation and apply */
 -        while (reader.getFilePointer() < end && !reader.isEOF())
 -        {
 -            long mutationStart = reader.getFilePointer();
 -            if (logger.isTraceEnabled())
 -                logger.trace("Reading mutation at {}", mutationStart);
 -
 -            long claimedCRC32;
 -            int serializedSize;
 -            try
 -            {
 -                // any of the reads may hit EOF
 -                serializedSize = reader.readInt();
 -                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 -                {
 -                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
 -                    return false;
 -                }
 -
 -                // Mutation must be at LEAST 10 bytes:
 -                // 3 each for a non-empty Keyspace and Key (including the
 -                // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
 -                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 -                if (serializedSize < 10)
 -                {
 -                    handleReplayError(tolerateErrors,
 -                                      "Invalid mutation size %d at %d in %s",
 -                                      serializedSize, mutationStart, errorContext);
 -                    return false;
 -                }
 -
 -                long claimedSizeChecksum;
 -                if (desc.version < CommitLogDescriptor.VERSION_21)
 -                    claimedSizeChecksum = reader.readLong();
 -                else
 -                    claimedSizeChecksum = reader.readInt() & 0xffffffffL;
 -                checksum.reset();
 -                if (desc.version < CommitLogDescriptor.VERSION_20)
 -                    checksum.update(serializedSize);
 -                else
 -                    updateChecksumInt(checksum, serializedSize);
 -
 -                if (checksum.getValue() != claimedSizeChecksum)
 -                {
 -                    handleReplayError(tolerateErrors,
 -                                      "Mutation size checksum failure at %d in %s",
 -                                      mutationStart, errorContext);
 -                    return false;
 -                }
 -                // ok.
 -
 -                if (serializedSize > buffer.length)
 -                    buffer = new byte[(int) (1.2 * serializedSize)];
 -                reader.readFully(buffer, 0, serializedSize);
 -                if (desc.version < CommitLogDescriptor.VERSION_21)
 -                    claimedCRC32 = reader.readLong();
 -                else
 -                    claimedCRC32 = reader.readInt() & 0xffffffffL;
 -            }
 -            catch (EOFException eof)
 -            {
 -                handleReplayError(tolerateErrors,
 -                                  "Unexpected end of segment",
 -                                  mutationStart, errorContext);
 -                return false; // last CL entry didn't get completely written. that's ok.
 -            }
 -
 -            checksum.update(buffer, 0, serializedSize);
 -            if (claimedCRC32 != checksum.getValue())
 -            {
 -                handleReplayError(tolerateErrors,
 -                                  "Mutation checksum failure at %d in %s",
 -                                  mutationStart, errorContext);
 -                continue;
 -            }
 -            replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
 -        }
 -        return true;
 -    }
 -
 -    /**
 -     * Deserializes and replays a commit log entry.
 -     */
 -    void replayMutation(byte[] inputBuffer, int size,
 -            final int entryLocation, final CommitLogDescriptor desc) throws IOException
 -    {
 -
 -        final Mutation mutation;
 -        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
 -        {
 -            mutation = Mutation.serializer.deserialize(bufIn,
 -                                                       desc.getMessagingVersion(),
 -                                                       SerializationHelper.Flag.LOCAL);
 -            // doublecheck that what we read is [still] valid for the current schema
 -            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 -                upd.validate();
 -        }
 -        catch (UnknownColumnFamilyException ex)
 -        {
 -            if (ex.cfId == null)
 -                return;
 -            AtomicInteger i = invalidMutations.get(ex.cfId);
 -            if (i == null)
 -            {
 -                i = new AtomicInteger(1);
 -                invalidMutations.put(ex.cfId, i);
 -            }
 -            else
 -                i.incrementAndGet();
 -            return;
 -        }
 -        catch (Throwable t)
 -        {
 -            JVMStabilityInspector.inspectThrowable(t);
 -            File f = File.createTempFile("mutation", "dat");
 -
 -            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
 -            {
 -                out.write(inputBuffer, 0, size);
 -            }
 -
 -            // Checksum passed so this error can't be permissible.
 -            handleReplayError(false,
 -                              "Unexpected error deserializing mutation; saved to %s.  " +
 -                              "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
 -                              "Exception follows: %s",
 -                              f.getAbsolutePath(),
 -                              t);
 -            return;
 -        }
 -
 -        if (logger.isTraceEnabled())
 -            logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 -
 -        Runnable runnable = new WrappedRunnable()
 -        {
 -            public void runMayThrow()
 -            {
 -                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 -                    return;
 -                if (pointInTimeExceeded(mutation))
 -                    return;
 -
 -                final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 -
 -                // Rebuild the mutation, omitting column families that
 -                //    a) the user has requested that we ignore,
 -                //    b) have already been flushed,
 -                // or c) are part of a cf that was dropped.
 -                // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 -                Mutation newMutation = null;
 -                for (PartitionUpdate update : replayFilter.filter(mutation))
 -                {
 -                    if (Schema.instance.getCF(update.metadata().cfId) == null)
 -                        continue; // dropped
 -
 -                    // replay if current segment is newer than last flushed one or,
 -                    // if it is the last known segment, if we are after the replay position
 -                    if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation)))
 -                    {
 -                        if (newMutation == null)
 -                            newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 -                        newMutation.add(update);
 -                        replayedCount.incrementAndGet();
 -                    }
 -                }
 -                if (newMutation != null)
 -                {
 -                    assert !newMutation.isEmpty();
 -
 -                    try
 -                    {
 -                        Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
 -                    }
 -                    catch (ExecutionException e)
 -                    {
 -                        throw Throwables.propagate(e.getCause());
 -                    }
 -
 -                    keyspacesRecovered.add(keyspace);
 -                }
 -            }
 -        };
 -        futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
 -        if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
 -        {
 -            FBUtilities.waitOnFutures(futures);
 -            futures.clear();
 -        }
 -    }
 -
      protected boolean pointInTimeExceeded(Mutation fm)
      {
          long restoreTarget = archiver.restorePointInTime;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a1158be,d2f12bf..e32c204
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -32,13 -40,16 +32,14 @@@ import org.cliffc.high_scale_lib.NonBlo
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
  import org.apache.cassandra.io.FSWriteError;
 -import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.utils.CLibrary;
+ import org.apache.cassandra.utils.IntegerInterval;
  import org.apache.cassandra.utils.concurrent.OpOrder;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
  
@@@ -461,22 -448,18 +475,19 @@@ public abstract class CommitLogSegmen
       * given context argument is contained in this file, it will only mark the CF as
       * clean if no newer writes have taken place.
       *
--     * @param cfId    the column family ID that is now clean
--     * @param context the optional clean offset
++     * @param cfId           the column family ID that is now clean
++     * @param startPosition  the start of the range that is clean
++     * @param endPosition    the end of the range that is clean
       */
-     public synchronized void markClean(UUID cfId, CommitLogPosition context)
 -    public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition)
++    public synchronized void markClean(UUID cfId, CommitLogPosition startPosition, CommitLogPosition endPosition)
      {
 -        if (startPosition.segment > id || endPosition.segment < id)
++        if (startPosition.segmentId > id || endPosition.segmentId < id)
+             return;
          if (!cfDirty.containsKey(cfId))
              return;
-         if (context.segmentId == id)
-             markClean(cfId, context.position);
-         else if (context.segmentId > id)
-             markClean(cfId, Integer.MAX_VALUE);
-     }
- 
-     private void markClean(UUID cfId, int position)
-     {
-         ensureAtleast(cfClean, cfId, position);
 -        int start = startPosition.segment == id ? startPosition.position : 0;
 -        int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE;
++        int start = startPosition.segmentId == id ? startPosition.position : 0;
++        int end = endPosition.segmentId == id ? endPosition.position : Integer.MAX_VALUE;
+         cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end);
          removeCleanFromDirty();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------